This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fc45b27  [SPARK-31064][SQL] New Parquet Predicate Filter APIs with 
multi-part Identifier Support
fc45b27 is described below

commit fc45b274df2096e0d6340974feafb17f1b2f2cc4
Author: DB Tsai <d_t...@apple.com>
AuthorDate: Fri Mar 6 21:09:24 2020 +0000

    [SPARK-31064][SQL] New Parquet Predicate Filter APIs with multi-part 
Identifier Support
    
    ### What changes were proposed in this pull request?
    Parquet's org.apache.parquet.filter2.predicate.FilterApi uses `dots` as 
separators to split the column name into multi-parts of nested fields. The 
drawback is this causes issues when the field name contains `dots`.
    
    The new APIs that will be added will take array of string directly for 
multi-parts of nested fields, so no confusion as using `dots` as separators.
    
    ### Why are the changes needed?
    To support nested predicate pushdown and predicate pushdown for columns 
containing `dots`.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UTs.
    
    Closes #27824 from dbtsai/SPARK-31064.
    
    Authored-by: DB Tsai <d_t...@apple.com>
    Signed-off-by: DB Tsai <d_t...@apple.com>
    (cherry picked from commit 7911f952029682e12fd1f90665f2869e1461c365)
    Signed-off-by: DB Tsai <d_t...@apple.com>
---
 .../parquet/filter2/predicate/SparkFilterApi.java  |  57 ++++++
 .../datasources/parquet/ParquetFilters.scala       | 192 +++++++++++----------
 2 files changed, 156 insertions(+), 93 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/parquet/filter2/predicate/SparkFilterApi.java
 
b/sql/core/src/main/java/org/apache/parquet/filter2/predicate/SparkFilterApi.java
new file mode 100644
index 0000000..884042c
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/parquet/filter2/predicate/SparkFilterApi.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.filter2.predicate;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
+import org.apache.parquet.filter2.predicate.Operators.BooleanColumn;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+
+/**
+ * TODO (PARQUET-1809): This is a temporary workaround; it is intended to be 
moved to Parquet.
+ */
+public final class SparkFilterApi {
+  public static IntColumn intColumn(String[] path) {
+    return new IntColumn(ColumnPath.get(path));
+  }
+
+  public static LongColumn longColumn(String[] path) {
+    return new LongColumn(ColumnPath.get(path));
+  }
+
+  public static FloatColumn floatColumn(String[] path) {
+    return new FloatColumn(ColumnPath.get(path));
+  }
+
+  public static DoubleColumn doubleColumn(String[] path) {
+    return new DoubleColumn(ColumnPath.get(path));
+  }
+
+  public static BooleanColumn booleanColumn(String[] path) {
+    return new BooleanColumn(ColumnPath.get(path));
+  }
+
+  public static BinaryColumn binaryColumn(String[] path) {
+    return new BinaryColumn(ColumnPath.get(path));
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 948a120..0706501 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -25,7 +25,7 @@ import java.util.Locale
 import scala.collection.JavaConverters.asScalaBufferConverter
 
 import org.apache.parquet.filter2.predicate._
-import org.apache.parquet.filter2.predicate.FilterApi._
+import org.apache.parquet.filter2.predicate.SparkFilterApi._
 import org.apache.parquet.io.api.Binary
 import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, 
PrimitiveComparator}
 import org.apache.parquet.schema.OriginalType._
@@ -126,264 +126,270 @@ class ParquetFilters(
     Binary.fromConstantByteArray(fixedLengthBytes, 0, numBytes)
   }
 
-  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+  private val makeEq:
+    PartialFunction[ParquetSchemaType, (Array[String], Any) => 
FilterPredicate] = {
     case ParquetBooleanType =>
-      (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[JBoolean])
+      (n: Array[String], v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[JBoolean])
     case ParquetByteType | ParquetShortType | ParquetIntegerType =>
-      (n: String, v: Any) => FilterApi.eq(
+      (n: Array[String], v: Any) => FilterApi.eq(
         intColumn(n),
         
Option(v).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull)
     case ParquetLongType =>
-      (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[JLong])
+      (n: Array[String], v: Any) => FilterApi.eq(longColumn(n), 
v.asInstanceOf[JLong])
     case ParquetFloatType =>
-      (n: String, v: Any) => FilterApi.eq(floatColumn(n), 
v.asInstanceOf[JFloat])
+      (n: Array[String], v: Any) => FilterApi.eq(floatColumn(n), 
v.asInstanceOf[JFloat])
     case ParquetDoubleType =>
-      (n: String, v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[JDouble])
+      (n: Array[String], v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[JDouble])
 
     // Binary.fromString and Binary.fromByteArray don't accept null values
     case ParquetStringType =>
-      (n: String, v: Any) => FilterApi.eq(
+      (n: Array[String], v: Any) => FilterApi.eq(
         binaryColumn(n),
         Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
     case ParquetBinaryType =>
-      (n: String, v: Any) => FilterApi.eq(
+      (n: Array[String], v: Any) => FilterApi.eq(
         binaryColumn(n),
         Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
     case ParquetDateType if pushDownDate =>
-      (n: String, v: Any) => FilterApi.eq(
+      (n: Array[String], v: Any) => FilterApi.eq(
         intColumn(n),
         Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
     case ParquetTimestampMicrosType if pushDownTimestamp =>
-      (n: String, v: Any) => FilterApi.eq(
+      (n: Array[String], v: Any) => FilterApi.eq(
         longColumn(n),
         Option(v).map(t => 
DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
           .asInstanceOf[JLong]).orNull)
     case ParquetTimestampMillisType if pushDownTimestamp =>
-      (n: String, v: Any) => FilterApi.eq(
+      (n: Array[String], v: Any) => FilterApi.eq(
         longColumn(n),
         
Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
-      (n: String, v: Any) => FilterApi.eq(
+      (n: Array[String], v: Any) => FilterApi.eq(
         intColumn(n),
         Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull)
     case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
-      (n: String, v: Any) => FilterApi.eq(
+      (n: Array[String], v: Any) => FilterApi.eq(
         longColumn(n),
         Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull)
     case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if 
pushDownDecimal =>
-      (n: String, v: Any) => FilterApi.eq(
+      (n: Array[String], v: Any) => FilterApi.eq(
         binaryColumn(n),
         Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], 
length)).orNull)
   }
 
-  private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+  private val makeNotEq:
+    PartialFunction[ParquetSchemaType, (Array[String], Any) => 
FilterPredicate] = {
     case ParquetBooleanType =>
-      (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), 
v.asInstanceOf[JBoolean])
+      (n: Array[String], v: Any) => FilterApi.notEq(booleanColumn(n), 
v.asInstanceOf[JBoolean])
     case ParquetByteType | ParquetShortType | ParquetIntegerType =>
-      (n: String, v: Any) => FilterApi.notEq(
+      (n: Array[String], v: Any) => FilterApi.notEq(
         intColumn(n),
         
Option(v).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull)
     case ParquetLongType =>
-      (n: String, v: Any) => FilterApi.notEq(longColumn(n), 
v.asInstanceOf[JLong])
+      (n: Array[String], v: Any) => FilterApi.notEq(longColumn(n), 
v.asInstanceOf[JLong])
     case ParquetFloatType =>
-      (n: String, v: Any) => FilterApi.notEq(floatColumn(n), 
v.asInstanceOf[JFloat])
+      (n: Array[String], v: Any) => FilterApi.notEq(floatColumn(n), 
v.asInstanceOf[JFloat])
     case ParquetDoubleType =>
-      (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), 
v.asInstanceOf[JDouble])
+      (n: Array[String], v: Any) => FilterApi.notEq(doubleColumn(n), 
v.asInstanceOf[JDouble])
 
     case ParquetStringType =>
-      (n: String, v: Any) => FilterApi.notEq(
+      (n: Array[String], v: Any) => FilterApi.notEq(
         binaryColumn(n),
         Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
     case ParquetBinaryType =>
-      (n: String, v: Any) => FilterApi.notEq(
+      (n: Array[String], v: Any) => FilterApi.notEq(
         binaryColumn(n),
         Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
     case ParquetDateType if pushDownDate =>
-      (n: String, v: Any) => FilterApi.notEq(
+      (n: Array[String], v: Any) => FilterApi.notEq(
         intColumn(n),
         Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
     case ParquetTimestampMicrosType if pushDownTimestamp =>
-      (n: String, v: Any) => FilterApi.notEq(
+      (n: Array[String], v: Any) => FilterApi.notEq(
         longColumn(n),
         Option(v).map(t => 
DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
           .asInstanceOf[JLong]).orNull)
     case ParquetTimestampMillisType if pushDownTimestamp =>
-      (n: String, v: Any) => FilterApi.notEq(
+      (n: Array[String], v: Any) => FilterApi.notEq(
         longColumn(n),
         
Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
-      (n: String, v: Any) => FilterApi.notEq(
+      (n: Array[String], v: Any) => FilterApi.notEq(
         intColumn(n),
         Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull)
     case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
-      (n: String, v: Any) => FilterApi.notEq(
+      (n: Array[String], v: Any) => FilterApi.notEq(
         longColumn(n),
         Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull)
     case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if 
pushDownDecimal =>
-      (n: String, v: Any) => FilterApi.notEq(
+      (n: Array[String], v: Any) => FilterApi.notEq(
         binaryColumn(n),
         Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], 
length)).orNull)
   }
 
-  private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+  private val makeLt:
+    PartialFunction[ParquetSchemaType, (Array[String], Any) => 
FilterPredicate] = {
     case ParquetByteType | ParquetShortType | ParquetIntegerType =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.lt(intColumn(n), 
v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
     case ParquetLongType =>
-      (n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[JLong])
+      (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), 
v.asInstanceOf[JLong])
     case ParquetFloatType =>
-      (n: String, v: Any) => FilterApi.lt(floatColumn(n), 
v.asInstanceOf[JFloat])
+      (n: Array[String], v: Any) => FilterApi.lt(floatColumn(n), 
v.asInstanceOf[JFloat])
     case ParquetDoubleType =>
-      (n: String, v: Any) => FilterApi.lt(doubleColumn(n), 
v.asInstanceOf[JDouble])
+      (n: Array[String], v: Any) => FilterApi.lt(doubleColumn(n), 
v.asInstanceOf[JDouble])
 
     case ParquetStringType =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.lt(binaryColumn(n), 
Binary.fromString(v.asInstanceOf[String]))
     case ParquetBinaryType =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.lt(binaryColumn(n), 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
     case ParquetDateType if pushDownDate =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.lt(intColumn(n), 
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
     case ParquetTimestampMicrosType if pushDownTimestamp =>
-      (n: String, v: Any) => FilterApi.lt(
+      (n: Array[String], v: Any) => FilterApi.lt(
         longColumn(n),
         
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
     case ParquetTimestampMillisType if pushDownTimestamp =>
-      (n: String, v: Any) => FilterApi.lt(
+      (n: Array[String], v: Any) => FilterApi.lt(
         longColumn(n),
         v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.lt(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal]))
     case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.lt(longColumn(n), 
decimalToInt64(v.asInstanceOf[JBigDecimal]))
     case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if 
pushDownDecimal =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.lt(binaryColumn(n), 
decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
   }
 
-  private val makeLtEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+  private val makeLtEq:
+    PartialFunction[ParquetSchemaType, (Array[String], Any) => 
FilterPredicate] = {
     case ParquetByteType | ParquetShortType | ParquetIntegerType =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.ltEq(intColumn(n), 
v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
     case ParquetLongType =>
-      (n: String, v: Any) => FilterApi.ltEq(longColumn(n), 
v.asInstanceOf[JLong])
+      (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), 
v.asInstanceOf[JLong])
     case ParquetFloatType =>
-      (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), 
v.asInstanceOf[JFloat])
+      (n: Array[String], v: Any) => FilterApi.ltEq(floatColumn(n), 
v.asInstanceOf[JFloat])
     case ParquetDoubleType =>
-      (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), 
v.asInstanceOf[JDouble])
+      (n: Array[String], v: Any) => FilterApi.ltEq(doubleColumn(n), 
v.asInstanceOf[JDouble])
 
     case ParquetStringType =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.ltEq(binaryColumn(n), 
Binary.fromString(v.asInstanceOf[String]))
     case ParquetBinaryType =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.ltEq(binaryColumn(n), 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
     case ParquetDateType if pushDownDate =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.ltEq(intColumn(n), 
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
     case ParquetTimestampMicrosType if pushDownTimestamp =>
-      (n: String, v: Any) => FilterApi.ltEq(
+      (n: Array[String], v: Any) => FilterApi.ltEq(
         longColumn(n),
         
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
     case ParquetTimestampMillisType if pushDownTimestamp =>
-      (n: String, v: Any) => FilterApi.ltEq(
+      (n: Array[String], v: Any) => FilterApi.ltEq(
         longColumn(n),
         v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.ltEq(intColumn(n), 
decimalToInt32(v.asInstanceOf[JBigDecimal]))
     case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.ltEq(longColumn(n), 
decimalToInt64(v.asInstanceOf[JBigDecimal]))
     case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if 
pushDownDecimal =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.ltEq(binaryColumn(n), 
decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
   }
 
-  private val makeGt: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+  private val makeGt:
+    PartialFunction[ParquetSchemaType, (Array[String], Any) => 
FilterPredicate] = {
     case ParquetByteType | ParquetShortType | ParquetIntegerType =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.gt(intColumn(n), 
v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
     case ParquetLongType =>
-      (n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[JLong])
+      (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), 
v.asInstanceOf[JLong])
     case ParquetFloatType =>
-      (n: String, v: Any) => FilterApi.gt(floatColumn(n), 
v.asInstanceOf[JFloat])
+      (n: Array[String], v: Any) => FilterApi.gt(floatColumn(n), 
v.asInstanceOf[JFloat])
     case ParquetDoubleType =>
-      (n: String, v: Any) => FilterApi.gt(doubleColumn(n), 
v.asInstanceOf[JDouble])
+      (n: Array[String], v: Any) => FilterApi.gt(doubleColumn(n), 
v.asInstanceOf[JDouble])
 
     case ParquetStringType =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.gt(binaryColumn(n), 
Binary.fromString(v.asInstanceOf[String]))
     case ParquetBinaryType =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.gt(binaryColumn(n), 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
     case ParquetDateType if pushDownDate =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.gt(intColumn(n), 
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
     case ParquetTimestampMicrosType if pushDownTimestamp =>
-      (n: String, v: Any) => FilterApi.gt(
+      (n: Array[String], v: Any) => FilterApi.gt(
         longColumn(n),
         
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
     case ParquetTimestampMillisType if pushDownTimestamp =>
-      (n: String, v: Any) => FilterApi.gt(
+      (n: Array[String], v: Any) => FilterApi.gt(
         longColumn(n),
         v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.gt(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal]))
     case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.gt(longColumn(n), 
decimalToInt64(v.asInstanceOf[JBigDecimal]))
     case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if 
pushDownDecimal =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.gt(binaryColumn(n), 
decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
   }
 
-  private val makeGtEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+  private val makeGtEq:
+    PartialFunction[ParquetSchemaType, (Array[String], Any) => 
FilterPredicate] = {
     case ParquetByteType | ParquetShortType | ParquetIntegerType =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.gtEq(intColumn(n), 
v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
     case ParquetLongType =>
-      (n: String, v: Any) => FilterApi.gtEq(longColumn(n), 
v.asInstanceOf[JLong])
+      (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), 
v.asInstanceOf[JLong])
     case ParquetFloatType =>
-      (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), 
v.asInstanceOf[JFloat])
+      (n: Array[String], v: Any) => FilterApi.gtEq(floatColumn(n), 
v.asInstanceOf[JFloat])
     case ParquetDoubleType =>
-      (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), 
v.asInstanceOf[JDouble])
+      (n: Array[String], v: Any) => FilterApi.gtEq(doubleColumn(n), 
v.asInstanceOf[JDouble])
 
     case ParquetStringType =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.gtEq(binaryColumn(n), 
Binary.fromString(v.asInstanceOf[String]))
     case ParquetBinaryType =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.gtEq(binaryColumn(n), 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
     case ParquetDateType if pushDownDate =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.gtEq(intColumn(n), 
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
     case ParquetTimestampMicrosType if pushDownTimestamp =>
-      (n: String, v: Any) => FilterApi.gtEq(
+      (n: Array[String], v: Any) => FilterApi.gtEq(
         longColumn(n),
         
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
     case ParquetTimestampMillisType if pushDownTimestamp =>
-      (n: String, v: Any) => FilterApi.gtEq(
+      (n: Array[String], v: Any) => FilterApi.gtEq(
         longColumn(n),
         v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.gtEq(intColumn(n), 
decimalToInt32(v.asInstanceOf[JBigDecimal]))
     case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.gtEq(longColumn(n), 
decimalToInt64(v.asInstanceOf[JBigDecimal]))
     case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if 
pushDownDecimal =>
-      (n: String, v: Any) =>
+      (n: Array[String], v: Any) =>
         FilterApi.gtEq(binaryColumn(n), 
decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
   }
 
@@ -503,38 +509,38 @@ class ParquetFilters(
     predicate match {
       case sources.IsNull(name) if canMakeFilterOn(name, null) =>
         makeEq.lift(nameToParquetField(name).fieldType)
-          .map(_(nameToParquetField(name).fieldName, null))
+          .map(_(Array(nameToParquetField(name).fieldName), null))
       case sources.IsNotNull(name) if canMakeFilterOn(name, null) =>
         makeNotEq.lift(nameToParquetField(name).fieldType)
-          .map(_(nameToParquetField(name).fieldName, null))
+          .map(_(Array(nameToParquetField(name).fieldName), null))
 
       case sources.EqualTo(name, value) if canMakeFilterOn(name, value) =>
         makeEq.lift(nameToParquetField(name).fieldType)
-          .map(_(nameToParquetField(name).fieldName, value))
+          .map(_(Array(nameToParquetField(name).fieldName), value))
       case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, 
value) =>
         makeNotEq.lift(nameToParquetField(name).fieldType)
-          .map(_(nameToParquetField(name).fieldName, value))
+          .map(_(Array(nameToParquetField(name).fieldName), value))
 
       case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) 
=>
         makeEq.lift(nameToParquetField(name).fieldType)
-          .map(_(nameToParquetField(name).fieldName, value))
+          .map(_(Array(nameToParquetField(name).fieldName), value))
       case sources.Not(sources.EqualNullSafe(name, value)) if 
canMakeFilterOn(name, value) =>
         makeNotEq.lift(nameToParquetField(name).fieldType)
-          .map(_(nameToParquetField(name).fieldName, value))
+          .map(_(Array(nameToParquetField(name).fieldName), value))
 
       case sources.LessThan(name, value) if canMakeFilterOn(name, value) =>
         makeLt.lift(nameToParquetField(name).fieldType)
-          .map(_(nameToParquetField(name).fieldName, value))
+          .map(_(Array(nameToParquetField(name).fieldName), value))
       case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name, 
value) =>
         makeLtEq.lift(nameToParquetField(name).fieldType)
-          .map(_(nameToParquetField(name).fieldName, value))
+          .map(_(Array(nameToParquetField(name).fieldName), value))
 
       case sources.GreaterThan(name, value) if canMakeFilterOn(name, value) =>
         makeGt.lift(nameToParquetField(name).fieldType)
-          .map(_(nameToParquetField(name).fieldName, value))
+          .map(_(Array(nameToParquetField(name).fieldName), value))
       case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name, 
value) =>
         makeGtEq.lift(nameToParquetField(name).fieldType)
-          .map(_(nameToParquetField(name).fieldName, value))
+          .map(_(Array(nameToParquetField(name).fieldName), value))
 
       case sources.And(lhs, rhs) =>
         // At here, it is not safe to just convert one side and remove the 
other side
@@ -585,13 +591,13 @@ class ParquetFilters(
         && values.distinct.length <= pushDownInFilterThreshold =>
         values.distinct.flatMap { v =>
           makeEq.lift(nameToParquetField(name).fieldType)
-            .map(_(nameToParquetField(name).fieldName, v))
+            .map(_(Array(nameToParquetField(name).fieldName), v))
         }.reduceLeftOption(FilterApi.or)
 
       case sources.StringStartsWith(name, prefix)
           if pushDownStartWith && canMakeFilterOn(name, prefix) =>
         Option(prefix).map { v =>
-          
FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldName),
+          
FilterApi.userDefined(binaryColumn(Array(nameToParquetField(name).fieldName)),
             new UserDefinedPredicate[Binary] with Serializable {
               private val strToBinary = Binary.fromReusedByteArray(v.getBytes)
               private val size = strToBinary.length


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to