Repository: spark
Updated Branches:
  refs/heads/master 019afd9c7 -> 776d183c8


[SPARK-9876][SQL] Update Parquet to 1.8.1.

## What changes were proposed in this pull request?

This includes minimal changes to get Spark using the current release of 
Parquet, 1.8.1.

## How was this patch tested?

This uses the existing Parquet tests.

Author: Ryan Blue <b...@apache.org>

Closes #13280 from rdblue/SPARK-9876-update-parquet.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/776d183c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/776d183c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/776d183c

Branch: refs/heads/master
Commit: 776d183c82b424ef7c3cae30537d8afe9b9eee83
Parents: 019afd9
Author: Ryan Blue <b...@apache.org>
Authored: Fri May 27 16:59:38 2016 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Fri May 27 16:59:38 2016 -0700

----------------------------------------------------------------------
 dev/deps/spark-deps-hadoop-2.2                  | 11 ++-
 dev/deps/spark-deps-hadoop-2.3                  | 11 ++-
 dev/deps/spark-deps-hadoop-2.4                  | 11 ++-
 dev/deps/spark-deps-hadoop-2.6                  | 11 ++-
 dev/deps/spark-deps-hadoop-2.7                  | 11 ++-
 pom.xml                                         |  2 +-
 .../SpecificParquetRecordReaderBase.java        | 20 +++--
 .../parquet/CatalystReadSupport.scala           | 12 ++-
 .../parquet/CatalystSchemaConverter.scala       | 16 ++++
 .../datasources/parquet/ParquetFilters.scala    | 83 ++++----------------
 .../parquet/ParquetSchemaSuite.scala            | 20 +++--
 11 files changed, 91 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/776d183c/dev/deps/spark-deps-hadoop-2.2
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index 578691c..deec033 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -129,14 +129,13 @@ opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.8.jar
-parquet-column-1.7.0.jar
-parquet-common-1.7.0.jar
-parquet-encoding-1.7.0.jar
+parquet-column-1.8.1.jar
+parquet-common-1.8.1.jar
+parquet-encoding-1.8.1.jar
 parquet-format-2.3.0-incubating.jar
-parquet-generator-1.7.0.jar
-parquet-hadoop-1.7.0.jar
+parquet-hadoop-1.8.1.jar
 parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.7.0.jar
+parquet-jackson-1.8.1.jar
 pmml-model-1.2.15.jar
 pmml-schema-1.2.15.jar
 protobuf-java-2.5.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/776d183c/dev/deps/spark-deps-hadoop-2.3
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index fc6306f..43c7dd3 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -136,14 +136,13 @@ opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.8.jar
-parquet-column-1.7.0.jar
-parquet-common-1.7.0.jar
-parquet-encoding-1.7.0.jar
+parquet-column-1.8.1.jar
+parquet-common-1.8.1.jar
+parquet-encoding-1.8.1.jar
 parquet-format-2.3.0-incubating.jar
-parquet-generator-1.7.0.jar
-parquet-hadoop-1.7.0.jar
+parquet-hadoop-1.8.1.jar
 parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.7.0.jar
+parquet-jackson-1.8.1.jar
 pmml-model-1.2.15.jar
 pmml-schema-1.2.15.jar
 protobuf-java-2.5.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/776d183c/dev/deps/spark-deps-hadoop-2.4
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index dee1417..7186b30 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -136,14 +136,13 @@ opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.8.jar
-parquet-column-1.7.0.jar
-parquet-common-1.7.0.jar
-parquet-encoding-1.7.0.jar
+parquet-column-1.8.1.jar
+parquet-common-1.8.1.jar
+parquet-encoding-1.8.1.jar
 parquet-format-2.3.0-incubating.jar
-parquet-generator-1.7.0.jar
-parquet-hadoop-1.7.0.jar
+parquet-hadoop-1.8.1.jar
 parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.7.0.jar
+parquet-jackson-1.8.1.jar
 pmml-model-1.2.15.jar
 pmml-schema-1.2.15.jar
 protobuf-java-2.5.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/776d183c/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 9695661..3e4ed74 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -144,14 +144,13 @@ opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.8.jar
-parquet-column-1.7.0.jar
-parquet-common-1.7.0.jar
-parquet-encoding-1.7.0.jar
+parquet-column-1.8.1.jar
+parquet-common-1.8.1.jar
+parquet-encoding-1.8.1.jar
 parquet-format-2.3.0-incubating.jar
-parquet-generator-1.7.0.jar
-parquet-hadoop-1.7.0.jar
+parquet-hadoop-1.8.1.jar
 parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.7.0.jar
+parquet-jackson-1.8.1.jar
 pmml-model-1.2.15.jar
 pmml-schema-1.2.15.jar
 protobuf-java-2.5.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/776d183c/dev/deps/spark-deps-hadoop-2.7
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 18c136e..6b99953 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -145,14 +145,13 @@ opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.8.jar
-parquet-column-1.7.0.jar
-parquet-common-1.7.0.jar
-parquet-encoding-1.7.0.jar
+parquet-column-1.8.1.jar
+parquet-common-1.8.1.jar
+parquet-encoding-1.8.1.jar
 parquet-format-2.3.0-incubating.jar
-parquet-generator-1.7.0.jar
-parquet-hadoop-1.7.0.jar
+parquet-hadoop-1.8.1.jar
 parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.7.0.jar
+parquet-jackson-1.8.1.jar
 pmml-model-1.2.15.jar
 pmml-schema-1.2.15.jar
 protobuf-java-2.5.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/776d183c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3fa0eeb..ce9aa9a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,7 +133,7 @@
     <!-- Version used for internal directory structure -->
     <hive.version.short>1.2.1</hive.version.short>
     <derby.version>10.11.1.1</derby.version>
-    <parquet.version>1.7.0</parquet.version>
+    <parquet.version>1.8.1</parquet.version>
     <hive.parquet.version>1.6.0</hive.parquet.version>
     <jetty.version>9.2.16.v20160414</jetty.version>
     <javaxservlet.version>3.1.0</javaxservlet.version>

http://git-wip-us.apache.org/repos/asf/spark/blob/776d183c/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index cbe8f78..3f7a872 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -58,6 +58,8 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.ConfigurationUtil;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Types;
 import org.apache.spark.sql.types.StructType;
 
@@ -186,15 +188,19 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
     if (columns == null) {
       this.requestedSchema = fileSchema;
     } else {
-      Types.MessageTypeBuilder builder = Types.buildMessage();
-      for (String s: columns) {
-        if (!fileSchema.containsField(s)) {
-          throw new IOException("Can only project existing columns. Unknown 
field: " + s +
-            " File schema:\n" + fileSchema);
+      if (columns.size() > 0) {
+        Types.MessageTypeBuilder builder = Types.buildMessage();
+        for (String s: columns) {
+          if (!fileSchema.containsField(s)) {
+            throw new IOException("Can only project existing columns. Unknown 
field: " + s +
+                    " File schema:\n" + fileSchema);
+          }
+          builder.addFields(fileSchema.getType(s));
         }
-        builder.addFields(fileSchema.getType(s));
+        this.requestedSchema = 
builder.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME());
+      } else {
+        this.requestedSchema = CatalystSchemaConverter.EMPTY_MESSAGE();
       }
-      this.requestedSchema = builder.named("spark_schema");
     }
     this.sparkSchema = new 
CatalystSchemaConverter(config).convert(requestedSchema);
     this.reader = new ParquetFileReader(config, file, blocks, 
requestedSchema.getColumns());

http://git-wip-us.apache.org/repos/asf/spark/blob/776d183c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 850e807..9c885b2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -109,10 +109,14 @@ private[parquet] object CatalystReadSupport {
    */
   def clipParquetSchema(parquetSchema: MessageType, catalystSchema: 
StructType): MessageType = {
     val clippedParquetFields = 
clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
-    Types
-      .buildMessage()
-      .addFields(clippedParquetFields: _*)
-      .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+    if (clippedParquetFields.isEmpty) {
+      CatalystSchemaConverter.EMPTY_MESSAGE
+    } else {
+      Types
+        .buildMessage()
+        .addFields(clippedParquetFields: _*)
+        .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+    }
   }
 
   private def clipParquetType(parquetType: Type, catalystType: DataType): Type 
= {

http://git-wip-us.apache.org/repos/asf/spark/blob/776d183c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 6f6340f..3688c3e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -538,6 +538,22 @@ private[parquet] class CatalystSchemaConverter(
 private[parquet] object CatalystSchemaConverter {
   val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
 
+  // !! HACK ALERT !!
+  //
+  // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing 
empty GroupType,
+  // which prevents us to avoid selecting any columns for queries like `SELECT 
COUNT(*) FROM t`.
+  // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT.
+  //
+  // To workaround this problem, here we first construct a `MessageType` with 
a single dummy
+  // field, and then remove the field to obtain an empty `MessageType`.
+  //
+  // TODO Reverts this change after upgrading parquet-mr to 1.8.2+
+  val EMPTY_MESSAGE = Types
+      .buildMessage()
+      .required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy")
+      .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+  EMPTY_MESSAGE.getFields.clear()
+
   def checkFieldName(name: String): Unit = {
     // ,;{}()\n\t= and space are special characters in Parquet schema
     checkConversionRequirement(

http://git-wip-us.apache.org/repos/asf/spark/blob/776d183c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 95afdc7..6240812 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -22,8 +22,6 @@ import java.io.Serializable
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.OriginalType
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.types._
@@ -53,18 +51,15 @@ private[sql] object ParquetFilters {
     case DoubleType =>
       (n: String, v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
-    // See https://issues.apache.org/jira/browse/SPARK-11153
-    /*
     // Binary.fromString and Binary.fromByteArray don't accept null values
     case StringType =>
       (n: String, v: Any) => FilterApi.eq(
         binaryColumn(n),
-        Option(v).map(s => 
Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
+        Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
     case BinaryType =>
       (n: String, v: Any) => FilterApi.eq(
         binaryColumn(n),
-        Option(v).map(b => 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-     */
+        Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
   }
 
   private val makeNotEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
@@ -79,17 +74,14 @@ private[sql] object ParquetFilters {
     case DoubleType =>
       (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
-    // See https://issues.apache.org/jira/browse/SPARK-11153
-    /*
     case StringType =>
       (n: String, v: Any) => FilterApi.notEq(
         binaryColumn(n),
-        Option(v).map(s => 
Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
+        Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
     case BinaryType =>
       (n: String, v: Any) => FilterApi.notEq(
         binaryColumn(n),
-        Option(v).map(b => 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-     */
+        Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
   }
 
   private val makeLt: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
@@ -102,16 +94,13 @@ private[sql] object ParquetFilters {
     case DoubleType =>
       (n: String, v: Any) => FilterApi.lt(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
-    // See https://issues.apache.org/jira/browse/SPARK-11153
-    /*
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.lt(binaryColumn(n),
-          Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
+          Binary.fromString(v.asInstanceOf[String]))
     case BinaryType =>
       (n: String, v: Any) =>
-        FilterApi.lt(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
-     */
+        FilterApi.lt(binaryColumn(n), 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
   }
 
   private val makeLtEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
@@ -124,16 +113,13 @@ private[sql] object ParquetFilters {
     case DoubleType =>
       (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
-    // See https://issues.apache.org/jira/browse/SPARK-11153
-    /*
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.ltEq(binaryColumn(n),
-          Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
+          Binary.fromString(v.asInstanceOf[String]))
     case BinaryType =>
       (n: String, v: Any) =>
-        FilterApi.ltEq(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
-     */
+        FilterApi.ltEq(binaryColumn(n), 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
   }
 
   private val makeGt: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
@@ -147,15 +133,13 @@ private[sql] object ParquetFilters {
       (n: String, v: Any) => FilterApi.gt(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
     // See https://issues.apache.org/jira/browse/SPARK-11153
-    /*
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.gt(binaryColumn(n),
-          Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
+          Binary.fromString(v.asInstanceOf[String]))
     case BinaryType =>
       (n: String, v: Any) =>
-        FilterApi.gt(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
-     */
+        FilterApi.gt(binaryColumn(n), 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
   }
 
   private val makeGtEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
@@ -168,16 +152,13 @@ private[sql] object ParquetFilters {
     case DoubleType =>
       (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
-    // See https://issues.apache.org/jira/browse/SPARK-11153
-    /*
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.gtEq(binaryColumn(n),
-          Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
+          Binary.fromString(v.asInstanceOf[String]))
     case BinaryType =>
       (n: String, v: Any) =>
-        FilterApi.gtEq(binaryColumn(n), 
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
-     */
+        FilterApi.gtEq(binaryColumn(n), 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
   }
 
   private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => 
FilterPredicate] = {
@@ -194,17 +175,14 @@ private[sql] object ParquetFilters {
       (n: String, v: Set[Any]) =>
         FilterApi.userDefined(doubleColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
 
-    // See https://issues.apache.org/jira/browse/SPARK-11153
-    /*
     case StringType =>
       (n: String, v: Set[Any]) =>
         FilterApi.userDefined(binaryColumn(n),
-          SetInFilter(v.map(s => 
Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8")))))
+          SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String]))))
     case BinaryType =>
       (n: String, v: Set[Any]) =>
         FilterApi.userDefined(binaryColumn(n),
-          SetInFilter(v.map(e => 
Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
-     */
+          SetInFilter(v.map(e => 
Binary.fromReusedByteArray(e.asInstanceOf[Array[Byte]]))))
   }
 
   /**
@@ -228,8 +206,6 @@ private[sql] object ParquetFilters {
   def createFilter(schema: StructType, predicate: sources.Filter): 
Option[FilterPredicate] = {
     val dataTypeOf = getFieldMap(schema).toMap
 
-    relaxParquetValidTypeMap
-
     // NOTE:
     //
     // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` 
evaluate to `NULL`,
@@ -299,35 +275,4 @@ private[sql] object ParquetFilters {
       case _ => None
     }
   }
-
-  // !! HACK ALERT !!
-  //
-  // This lazy val is a workaround for PARQUET-201, and should be removed once 
we upgrade to
-  // parquet-mr 1.8.1 or higher versions.
-  //
-  // In Parquet, not all types of columns can be used for filter push-down 
optimization.  The set
-  // of valid column types is controlled by `ValidTypeMap`.  Unfortunately, in 
parquet-mr 1.7.0 and
-  // prior versions, the limitation is too strict, and doesn't allow `BINARY 
(ENUM)` columns to be
-  // pushed down.
-  //
-  // This restriction is problematic for Spark SQL, because Spark SQL doesn't 
have a type that maps
-  // to Parquet original type `ENUM` directly, and always converts `ENUM` to 
`StringType`.  Thus,
-  // a predicate involving a `ENUM` field can be pushed-down as a string 
column, which is perfectly
-  // legal except that it fails the `ValidTypeMap` check.
-  //
-  // Here we add `BINARY (ENUM)` into `ValidTypeMap` lazily via reflection to 
workaround this issue.
-  private lazy val relaxParquetValidTypeMap: Unit = {
-    val constructor = Class
-      .forName(classOf[ValidTypeMap].getCanonicalName + "$FullTypeDescriptor")
-      .getDeclaredConstructor(classOf[PrimitiveTypeName], 
classOf[OriginalType])
-
-    constructor.setAccessible(true)
-    val enumTypeDescriptor = constructor
-      .newInstance(PrimitiveTypeName.BINARY, OriginalType.ENUM)
-      .asInstanceOf[AnyRef]
-
-    val addMethod = classOf[ValidTypeMap].getDeclaredMethods.find(_.getName == 
"add").get
-    addMethod.setAccessible(true)
-    addMethod.invoke(null, classOf[Binary], enumTypeDescriptor)
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/776d183c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 6db6492..0b5038c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
 
-import org.apache.parquet.schema.MessageTypeParser
+import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.ScalaReflection
@@ -1065,18 +1065,26 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       parquetSchema: String,
       catalystSchema: StructType,
       expectedSchema: String): Unit = {
+    testSchemaClipping(testName, parquetSchema, catalystSchema,
+      MessageTypeParser.parseMessageType(expectedSchema))
+  }
+
+  private def testSchemaClipping(
+      testName: String,
+      parquetSchema: String,
+      catalystSchema: StructType,
+      expectedSchema: MessageType): Unit = {
     test(s"Clipping - $testName") {
-      val expected = MessageTypeParser.parseMessageType(expectedSchema)
       val actual = CatalystReadSupport.clipParquetSchema(
         MessageTypeParser.parseMessageType(parquetSchema), catalystSchema)
 
       try {
-        expected.checkContains(actual)
-        actual.checkContains(expected)
+        expectedSchema.checkContains(actual)
+        actual.checkContains(expectedSchema)
       } catch { case cause: Throwable =>
         fail(
           s"""Expected clipped schema:
-             |$expected
+             |$expectedSchema
              |Actual clipped schema:
              |$actual
            """.stripMargin,
@@ -1429,7 +1437,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
 
     catalystSchema = new StructType(),
 
-    expectedSchema = "message root {}")
+    expectedSchema = CatalystSchemaConverter.EMPTY_MESSAGE)
 
   testSchemaClipping(
     "disjoint field sets",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to