This is an automated email from the ASF dual-hosted git repository.
dbtsai pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new fc45b27 [SPARK-31064][SQL] New Parquet Predicate Filter APIs with
multi-part Identifier Support
fc45b27 is described below
commit fc45b274df2096e0d6340974feafb17f1b2f2cc4
Author: DB Tsai <[email protected]>
AuthorDate: Fri Mar 6 21:09:24 2020 +0000
[SPARK-31064][SQL] New Parquet Predicate Filter APIs with multi-part
Identifier Support
### What changes were proposed in this pull request?
Parquet's org.apache.parquet.filter2.predicate.FilterApi uses `dots` as
separators to split the column name into multi-parts of nested fields. The
drawback is this causes issues when the field name contains `dots`.
The new APIs that will be added will take array of string directly for
multi-parts of nested fields, so no confusion as using `dots` as separators.
### Why are the changes needed?
To support nested predicate pushdown and predicate pushdown for columns
containing `dots`.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing UTs.
Closes #27824 from dbtsai/SPARK-31064.
Authored-by: DB Tsai <[email protected]>
Signed-off-by: DB Tsai <[email protected]>
(cherry picked from commit 7911f952029682e12fd1f90665f2869e1461c365)
Signed-off-by: DB Tsai <[email protected]>
---
.../parquet/filter2/predicate/SparkFilterApi.java | 57 ++++++
.../datasources/parquet/ParquetFilters.scala | 192 +++++++++++----------
2 files changed, 156 insertions(+), 93 deletions(-)
diff --git
a/sql/core/src/main/java/org/apache/parquet/filter2/predicate/SparkFilterApi.java
b/sql/core/src/main/java/org/apache/parquet/filter2/predicate/SparkFilterApi.java
new file mode 100644
index 0000000..884042c
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/parquet/filter2/predicate/SparkFilterApi.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.filter2.predicate;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
+import org.apache.parquet.filter2.predicate.Operators.BooleanColumn;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+
+/**
+ * TODO (PARQUET-1809): This is a temporary workaround; it is intended to be
moved to Parquet.
+ */
+public final class SparkFilterApi {
+ public static IntColumn intColumn(String[] path) {
+ return new IntColumn(ColumnPath.get(path));
+ }
+
+ public static LongColumn longColumn(String[] path) {
+ return new LongColumn(ColumnPath.get(path));
+ }
+
+ public static FloatColumn floatColumn(String[] path) {
+ return new FloatColumn(ColumnPath.get(path));
+ }
+
+ public static DoubleColumn doubleColumn(String[] path) {
+ return new DoubleColumn(ColumnPath.get(path));
+ }
+
+ public static BooleanColumn booleanColumn(String[] path) {
+ return new BooleanColumn(ColumnPath.get(path));
+ }
+
+ public static BinaryColumn binaryColumn(String[] path) {
+ return new BinaryColumn(ColumnPath.get(path));
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 948a120..0706501 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -25,7 +25,7 @@ import java.util.Locale
import scala.collection.JavaConverters.asScalaBufferConverter
import org.apache.parquet.filter2.predicate._
-import org.apache.parquet.filter2.predicate.FilterApi._
+import org.apache.parquet.filter2.predicate.SparkFilterApi._
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType,
PrimitiveComparator}
import org.apache.parquet.schema.OriginalType._
@@ -126,264 +126,270 @@ class ParquetFilters(
Binary.fromConstantByteArray(fixedLengthBytes, 0, numBytes)
}
- private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) =>
FilterPredicate] = {
+ private val makeEq:
+ PartialFunction[ParquetSchemaType, (Array[String], Any) =>
FilterPredicate] = {
case ParquetBooleanType =>
- (n: String, v: Any) => FilterApi.eq(booleanColumn(n),
v.asInstanceOf[JBoolean])
+ (n: Array[String], v: Any) => FilterApi.eq(booleanColumn(n),
v.asInstanceOf[JBoolean])
case ParquetByteType | ParquetShortType | ParquetIntegerType =>
- (n: String, v: Any) => FilterApi.eq(
+ (n: Array[String], v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull)
case ParquetLongType =>
- (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[JLong])
+ (n: Array[String], v: Any) => FilterApi.eq(longColumn(n),
v.asInstanceOf[JLong])
case ParquetFloatType =>
- (n: String, v: Any) => FilterApi.eq(floatColumn(n),
v.asInstanceOf[JFloat])
+ (n: Array[String], v: Any) => FilterApi.eq(floatColumn(n),
v.asInstanceOf[JFloat])
case ParquetDoubleType =>
- (n: String, v: Any) => FilterApi.eq(doubleColumn(n),
v.asInstanceOf[JDouble])
+ (n: Array[String], v: Any) => FilterApi.eq(doubleColumn(n),
v.asInstanceOf[JDouble])
// Binary.fromString and Binary.fromByteArray don't accept null values
case ParquetStringType =>
- (n: String, v: Any) => FilterApi.eq(
+ (n: Array[String], v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
case ParquetBinaryType =>
- (n: String, v: Any) => FilterApi.eq(
+ (n: Array[String], v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b =>
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case ParquetDateType if pushDownDate =>
- (n: String, v: Any) => FilterApi.eq(
+ (n: Array[String], v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(date =>
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
case ParquetTimestampMicrosType if pushDownTimestamp =>
- (n: String, v: Any) => FilterApi.eq(
+ (n: Array[String], v: Any) => FilterApi.eq(
longColumn(n),
Option(v).map(t =>
DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
.asInstanceOf[JLong]).orNull)
case ParquetTimestampMillisType if pushDownTimestamp =>
- (n: String, v: Any) => FilterApi.eq(
+ (n: Array[String], v: Any) => FilterApi.eq(
longColumn(n),
Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
- (n: String, v: Any) => FilterApi.eq(
+ (n: Array[String], v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull)
case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
- (n: String, v: Any) => FilterApi.eq(
+ (n: Array[String], v: Any) => FilterApi.eq(
longColumn(n),
Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull)
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if
pushDownDecimal =>
- (n: String, v: Any) => FilterApi.eq(
+ (n: Array[String], v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal],
length)).orNull)
}
- private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) =>
FilterPredicate] = {
+ private val makeNotEq:
+ PartialFunction[ParquetSchemaType, (Array[String], Any) =>
FilterPredicate] = {
case ParquetBooleanType =>
- (n: String, v: Any) => FilterApi.notEq(booleanColumn(n),
v.asInstanceOf[JBoolean])
+ (n: Array[String], v: Any) => FilterApi.notEq(booleanColumn(n),
v.asInstanceOf[JBoolean])
case ParquetByteType | ParquetShortType | ParquetIntegerType =>
- (n: String, v: Any) => FilterApi.notEq(
+ (n: Array[String], v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull)
case ParquetLongType =>
- (n: String, v: Any) => FilterApi.notEq(longColumn(n),
v.asInstanceOf[JLong])
+ (n: Array[String], v: Any) => FilterApi.notEq(longColumn(n),
v.asInstanceOf[JLong])
case ParquetFloatType =>
- (n: String, v: Any) => FilterApi.notEq(floatColumn(n),
v.asInstanceOf[JFloat])
+ (n: Array[String], v: Any) => FilterApi.notEq(floatColumn(n),
v.asInstanceOf[JFloat])
case ParquetDoubleType =>
- (n: String, v: Any) => FilterApi.notEq(doubleColumn(n),
v.asInstanceOf[JDouble])
+ (n: Array[String], v: Any) => FilterApi.notEq(doubleColumn(n),
v.asInstanceOf[JDouble])
case ParquetStringType =>
- (n: String, v: Any) => FilterApi.notEq(
+ (n: Array[String], v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
case ParquetBinaryType =>
- (n: String, v: Any) => FilterApi.notEq(
+ (n: Array[String], v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b =>
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case ParquetDateType if pushDownDate =>
- (n: String, v: Any) => FilterApi.notEq(
+ (n: Array[String], v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(date =>
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
case ParquetTimestampMicrosType if pushDownTimestamp =>
- (n: String, v: Any) => FilterApi.notEq(
+ (n: Array[String], v: Any) => FilterApi.notEq(
longColumn(n),
Option(v).map(t =>
DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
.asInstanceOf[JLong]).orNull)
case ParquetTimestampMillisType if pushDownTimestamp =>
- (n: String, v: Any) => FilterApi.notEq(
+ (n: Array[String], v: Any) => FilterApi.notEq(
longColumn(n),
Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
- (n: String, v: Any) => FilterApi.notEq(
+ (n: Array[String], v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull)
case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
- (n: String, v: Any) => FilterApi.notEq(
+ (n: Array[String], v: Any) => FilterApi.notEq(
longColumn(n),
Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull)
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if
pushDownDecimal =>
- (n: String, v: Any) => FilterApi.notEq(
+ (n: Array[String], v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal],
length)).orNull)
}
- private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) =>
FilterPredicate] = {
+ private val makeLt:
+ PartialFunction[ParquetSchemaType, (Array[String], Any) =>
FilterPredicate] = {
case ParquetByteType | ParquetShortType | ParquetIntegerType =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.lt(intColumn(n),
v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
case ParquetLongType =>
- (n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[JLong])
+ (n: Array[String], v: Any) => FilterApi.lt(longColumn(n),
v.asInstanceOf[JLong])
case ParquetFloatType =>
- (n: String, v: Any) => FilterApi.lt(floatColumn(n),
v.asInstanceOf[JFloat])
+ (n: Array[String], v: Any) => FilterApi.lt(floatColumn(n),
v.asInstanceOf[JFloat])
case ParquetDoubleType =>
- (n: String, v: Any) => FilterApi.lt(doubleColumn(n),
v.asInstanceOf[JDouble])
+ (n: Array[String], v: Any) => FilterApi.lt(doubleColumn(n),
v.asInstanceOf[JDouble])
case ParquetStringType =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.lt(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case ParquetBinaryType =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.lt(binaryColumn(n),
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case ParquetDateType if pushDownDate =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.lt(intColumn(n),
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
- (n: String, v: Any) => FilterApi.lt(
+ (n: Array[String], v: Any) => FilterApi.lt(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
- (n: String, v: Any) => FilterApi.lt(
+ (n: Array[String], v: Any) => FilterApi.lt(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.lt(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal]))
case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.lt(longColumn(n),
decimalToInt64(v.asInstanceOf[JBigDecimal]))
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if
pushDownDecimal =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.lt(binaryColumn(n),
decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
}
- private val makeLtEq: PartialFunction[ParquetSchemaType, (String, Any) =>
FilterPredicate] = {
+ private val makeLtEq:
+ PartialFunction[ParquetSchemaType, (Array[String], Any) =>
FilterPredicate] = {
case ParquetByteType | ParquetShortType | ParquetIntegerType =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.ltEq(intColumn(n),
v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
case ParquetLongType =>
- (n: String, v: Any) => FilterApi.ltEq(longColumn(n),
v.asInstanceOf[JLong])
+ (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n),
v.asInstanceOf[JLong])
case ParquetFloatType =>
- (n: String, v: Any) => FilterApi.ltEq(floatColumn(n),
v.asInstanceOf[JFloat])
+ (n: Array[String], v: Any) => FilterApi.ltEq(floatColumn(n),
v.asInstanceOf[JFloat])
case ParquetDoubleType =>
- (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n),
v.asInstanceOf[JDouble])
+ (n: Array[String], v: Any) => FilterApi.ltEq(doubleColumn(n),
v.asInstanceOf[JDouble])
case ParquetStringType =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.ltEq(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case ParquetBinaryType =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.ltEq(binaryColumn(n),
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case ParquetDateType if pushDownDate =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.ltEq(intColumn(n),
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
- (n: String, v: Any) => FilterApi.ltEq(
+ (n: Array[String], v: Any) => FilterApi.ltEq(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
- (n: String, v: Any) => FilterApi.ltEq(
+ (n: Array[String], v: Any) => FilterApi.ltEq(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.ltEq(intColumn(n),
decimalToInt32(v.asInstanceOf[JBigDecimal]))
case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.ltEq(longColumn(n),
decimalToInt64(v.asInstanceOf[JBigDecimal]))
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if
pushDownDecimal =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.ltEq(binaryColumn(n),
decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
}
- private val makeGt: PartialFunction[ParquetSchemaType, (String, Any) =>
FilterPredicate] = {
+ private val makeGt:
+ PartialFunction[ParquetSchemaType, (Array[String], Any) =>
FilterPredicate] = {
case ParquetByteType | ParquetShortType | ParquetIntegerType =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.gt(intColumn(n),
v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
case ParquetLongType =>
- (n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[JLong])
+ (n: Array[String], v: Any) => FilterApi.gt(longColumn(n),
v.asInstanceOf[JLong])
case ParquetFloatType =>
- (n: String, v: Any) => FilterApi.gt(floatColumn(n),
v.asInstanceOf[JFloat])
+ (n: Array[String], v: Any) => FilterApi.gt(floatColumn(n),
v.asInstanceOf[JFloat])
case ParquetDoubleType =>
- (n: String, v: Any) => FilterApi.gt(doubleColumn(n),
v.asInstanceOf[JDouble])
+ (n: Array[String], v: Any) => FilterApi.gt(doubleColumn(n),
v.asInstanceOf[JDouble])
case ParquetStringType =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.gt(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case ParquetBinaryType =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.gt(binaryColumn(n),
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case ParquetDateType if pushDownDate =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.gt(intColumn(n),
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
- (n: String, v: Any) => FilterApi.gt(
+ (n: Array[String], v: Any) => FilterApi.gt(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
- (n: String, v: Any) => FilterApi.gt(
+ (n: Array[String], v: Any) => FilterApi.gt(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.gt(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal]))
case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.gt(longColumn(n),
decimalToInt64(v.asInstanceOf[JBigDecimal]))
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if
pushDownDecimal =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.gt(binaryColumn(n),
decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
}
- private val makeGtEq: PartialFunction[ParquetSchemaType, (String, Any) =>
FilterPredicate] = {
+ private val makeGtEq:
+ PartialFunction[ParquetSchemaType, (Array[String], Any) =>
FilterPredicate] = {
case ParquetByteType | ParquetShortType | ParquetIntegerType =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.gtEq(intColumn(n),
v.asInstanceOf[Number].intValue.asInstanceOf[Integer])
case ParquetLongType =>
- (n: String, v: Any) => FilterApi.gtEq(longColumn(n),
v.asInstanceOf[JLong])
+ (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n),
v.asInstanceOf[JLong])
case ParquetFloatType =>
- (n: String, v: Any) => FilterApi.gtEq(floatColumn(n),
v.asInstanceOf[JFloat])
+ (n: Array[String], v: Any) => FilterApi.gtEq(floatColumn(n),
v.asInstanceOf[JFloat])
case ParquetDoubleType =>
- (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n),
v.asInstanceOf[JDouble])
+ (n: Array[String], v: Any) => FilterApi.gtEq(doubleColumn(n),
v.asInstanceOf[JDouble])
case ParquetStringType =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.gtEq(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case ParquetBinaryType =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.gtEq(binaryColumn(n),
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case ParquetDateType if pushDownDate =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.gtEq(intColumn(n),
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
- (n: String, v: Any) => FilterApi.gtEq(
+ (n: Array[String], v: Any) => FilterApi.gtEq(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
- (n: String, v: Any) => FilterApi.gtEq(
+ (n: Array[String], v: Any) => FilterApi.gtEq(
longColumn(n),
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.gtEq(intColumn(n),
decimalToInt32(v.asInstanceOf[JBigDecimal]))
case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.gtEq(longColumn(n),
decimalToInt64(v.asInstanceOf[JBigDecimal]))
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if
pushDownDecimal =>
- (n: String, v: Any) =>
+ (n: Array[String], v: Any) =>
FilterApi.gtEq(binaryColumn(n),
decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
}
@@ -503,38 +509,38 @@ class ParquetFilters(
predicate match {
case sources.IsNull(name) if canMakeFilterOn(name, null) =>
makeEq.lift(nameToParquetField(name).fieldType)
- .map(_(nameToParquetField(name).fieldName, null))
+ .map(_(Array(nameToParquetField(name).fieldName), null))
case sources.IsNotNull(name) if canMakeFilterOn(name, null) =>
makeNotEq.lift(nameToParquetField(name).fieldType)
- .map(_(nameToParquetField(name).fieldName, null))
+ .map(_(Array(nameToParquetField(name).fieldName), null))
case sources.EqualTo(name, value) if canMakeFilterOn(name, value) =>
makeEq.lift(nameToParquetField(name).fieldType)
- .map(_(nameToParquetField(name).fieldName, value))
+ .map(_(Array(nameToParquetField(name).fieldName), value))
case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name,
value) =>
makeNotEq.lift(nameToParquetField(name).fieldType)
- .map(_(nameToParquetField(name).fieldName, value))
+ .map(_(Array(nameToParquetField(name).fieldName), value))
case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value)
=>
makeEq.lift(nameToParquetField(name).fieldType)
- .map(_(nameToParquetField(name).fieldName, value))
+ .map(_(Array(nameToParquetField(name).fieldName), value))
case sources.Not(sources.EqualNullSafe(name, value)) if
canMakeFilterOn(name, value) =>
makeNotEq.lift(nameToParquetField(name).fieldType)
- .map(_(nameToParquetField(name).fieldName, value))
+ .map(_(Array(nameToParquetField(name).fieldName), value))
case sources.LessThan(name, value) if canMakeFilterOn(name, value) =>
makeLt.lift(nameToParquetField(name).fieldType)
- .map(_(nameToParquetField(name).fieldName, value))
+ .map(_(Array(nameToParquetField(name).fieldName), value))
case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name,
value) =>
makeLtEq.lift(nameToParquetField(name).fieldType)
- .map(_(nameToParquetField(name).fieldName, value))
+ .map(_(Array(nameToParquetField(name).fieldName), value))
case sources.GreaterThan(name, value) if canMakeFilterOn(name, value) =>
makeGt.lift(nameToParquetField(name).fieldType)
- .map(_(nameToParquetField(name).fieldName, value))
+ .map(_(Array(nameToParquetField(name).fieldName), value))
case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name,
value) =>
makeGtEq.lift(nameToParquetField(name).fieldType)
- .map(_(nameToParquetField(name).fieldName, value))
+ .map(_(Array(nameToParquetField(name).fieldName), value))
case sources.And(lhs, rhs) =>
// At here, it is not safe to just convert one side and remove the
other side
@@ -585,13 +591,13 @@ class ParquetFilters(
&& values.distinct.length <= pushDownInFilterThreshold =>
values.distinct.flatMap { v =>
makeEq.lift(nameToParquetField(name).fieldType)
- .map(_(nameToParquetField(name).fieldName, v))
+ .map(_(Array(nameToParquetField(name).fieldName), v))
}.reduceLeftOption(FilterApi.or)
case sources.StringStartsWith(name, prefix)
if pushDownStartWith && canMakeFilterOn(name, prefix) =>
Option(prefix).map { v =>
-
FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldName),
+
FilterApi.userDefined(binaryColumn(Array(nameToParquetField(name).fieldName)),
new UserDefinedPredicate[Binary] with Serializable {
private val strToBinary = Binary.fromReusedByteArray(v.getBytes)
private val size = strToBinary.length
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]