This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new fc45b27 [SPARK-31064][SQL] New Parquet Predicate Filter APIs with multi-part Identifier Support fc45b27 is described below commit fc45b274df2096e0d6340974feafb17f1b2f2cc4 Author: DB Tsai <d_t...@apple.com> AuthorDate: Fri Mar 6 21:09:24 2020 +0000 [SPARK-31064][SQL] New Parquet Predicate Filter APIs with multi-part Identifier Support ### What changes were proposed in this pull request? Parquet's org.apache.parquet.filter2.predicate.FilterApi uses `dots` as separators to split the column name into multi-parts of nested fields. The drawback is this causes issues when the field name contains `dots`. The new APIs that will be added will take array of string directly for multi-parts of nested fields, so no confusion as using `dots` as separators. ### Why are the changes needed? To support nested predicate pushdown and predicate pushdown for columns containing `dots`. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UTs. Closes #27824 from dbtsai/SPARK-31064. Authored-by: DB Tsai <d_t...@apple.com> Signed-off-by: DB Tsai <d_t...@apple.com> (cherry picked from commit 7911f952029682e12fd1f90665f2869e1461c365) Signed-off-by: DB Tsai <d_t...@apple.com> --- .../parquet/filter2/predicate/SparkFilterApi.java | 57 ++++++ .../datasources/parquet/ParquetFilters.scala | 192 +++++++++++---------- 2 files changed, 156 insertions(+), 93 deletions(-) diff --git a/sql/core/src/main/java/org/apache/parquet/filter2/predicate/SparkFilterApi.java b/sql/core/src/main/java/org/apache/parquet/filter2/predicate/SparkFilterApi.java new file mode 100644 index 0000000..884042c --- /dev/null +++ b/sql/core/src/main/java/org/apache/parquet/filter2/predicate/SparkFilterApi.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.filter2.predicate; + +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; +import org.apache.parquet.filter2.predicate.Operators.BooleanColumn; +import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; +import org.apache.parquet.filter2.predicate.Operators.FloatColumn; +import org.apache.parquet.filter2.predicate.Operators.IntColumn; +import org.apache.parquet.filter2.predicate.Operators.LongColumn; + +/** + * TODO (PARQUET-1809): This is a temporary workaround; it is intended to be moved to Parquet. + */ +public final class SparkFilterApi { + public static IntColumn intColumn(String[] path) { + return new IntColumn(ColumnPath.get(path)); + } + + public static LongColumn longColumn(String[] path) { + return new LongColumn(ColumnPath.get(path)); + } + + public static FloatColumn floatColumn(String[] path) { + return new FloatColumn(ColumnPath.get(path)); + } + + public static DoubleColumn doubleColumn(String[] path) { + return new DoubleColumn(ColumnPath.get(path)); + } + + public static BooleanColumn booleanColumn(String[] path) { + return new BooleanColumn(ColumnPath.get(path)); + } + + public static BinaryColumn binaryColumn(String[] path) { + return new BinaryColumn(ColumnPath.get(path)); + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 948a120..0706501 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -25,7 +25,7 @@ import java.util.Locale import scala.collection.JavaConverters.asScalaBufferConverter import org.apache.parquet.filter2.predicate._ -import org.apache.parquet.filter2.predicate.FilterApi._ +import org.apache.parquet.filter2.predicate.SparkFilterApi._ import org.apache.parquet.io.api.Binary import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, PrimitiveComparator} import org.apache.parquet.schema.OriginalType._ @@ -126,264 +126,270 @@ class ParquetFilters( Binary.fromConstantByteArray(fixedLengthBytes, 0, numBytes) } - private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { + private val makeEq: + PartialFunction[ParquetSchemaType, (Array[String], Any) => FilterPredicate] = { case ParquetBooleanType => - (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[JBoolean]) + (n: Array[String], v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[JBoolean]) case ParquetByteType | ParquetShortType | ParquetIntegerType => - (n: String, v: Any) => FilterApi.eq( + (n: Array[String], v: Any) => FilterApi.eq( intColumn(n), Option(v).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull) case ParquetLongType => - (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[JLong]) case ParquetFloatType => - (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[JFloat]) + (n: Array[String], v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[JFloat]) case ParquetDoubleType => - (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[JDouble]) + (n: Array[String], v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[JDouble]) // Binary.fromString and Binary.fromByteArray don't accept null values case ParquetStringType => - (n: String, v: Any) => FilterApi.eq( + (n: Array[String], v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) case ParquetBinaryType => - (n: String, v: Any) => FilterApi.eq( + (n: Array[String], v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) case ParquetDateType if pushDownDate => - (n: String, v: Any) => FilterApi.eq( + (n: Array[String], v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) case ParquetTimestampMicrosType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.eq( + (n: Array[String], v: Any) => FilterApi.eq( longColumn(n), Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp]) .asInstanceOf[JLong]).orNull) case ParquetTimestampMillisType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.eq( + (n: Array[String], v: Any) => FilterApi.eq( longColumn(n), Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => - (n: String, v: Any) => FilterApi.eq( + (n: Array[String], v: Any) => FilterApi.eq( intColumn(n), Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull) case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => - (n: String, v: Any) => FilterApi.eq( + (n: Array[String], v: Any) => FilterApi.eq( longColumn(n), Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull) case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => - (n: String, v: Any) => FilterApi.eq( + (n: Array[String], v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length)).orNull) } - private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { + private val makeNotEq: + PartialFunction[ParquetSchemaType, (Array[String], Any) => FilterPredicate] = { case ParquetBooleanType => - (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[JBoolean]) + (n: Array[String], v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[JBoolean]) case ParquetByteType | ParquetShortType | ParquetIntegerType => - (n: String, v: Any) => FilterApi.notEq( + (n: Array[String], v: Any) => FilterApi.notEq( intColumn(n), Option(v).map(_.asInstanceOf[Number].intValue.asInstanceOf[Integer]).orNull) case ParquetLongType => - (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[JLong]) case ParquetFloatType => - (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[JFloat]) + (n: Array[String], v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[JFloat]) case ParquetDoubleType => - (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[JDouble]) + (n: Array[String], v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[JDouble]) case ParquetStringType => - (n: String, v: Any) => FilterApi.notEq( + (n: Array[String], v: Any) => FilterApi.notEq( binaryColumn(n), Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) case ParquetBinaryType => - (n: String, v: Any) => FilterApi.notEq( + (n: Array[String], v: Any) => FilterApi.notEq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) case ParquetDateType if pushDownDate => - (n: String, v: Any) => FilterApi.notEq( + (n: Array[String], v: Any) => FilterApi.notEq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) case ParquetTimestampMicrosType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.notEq( + (n: Array[String], v: Any) => FilterApi.notEq( longColumn(n), Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp]) .asInstanceOf[JLong]).orNull) case ParquetTimestampMillisType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.notEq( + (n: Array[String], v: Any) => FilterApi.notEq( longColumn(n), Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => - (n: String, v: Any) => FilterApi.notEq( + (n: Array[String], v: Any) => FilterApi.notEq( intColumn(n), Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull) case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => - (n: String, v: Any) => FilterApi.notEq( + (n: Array[String], v: Any) => FilterApi.notEq( longColumn(n), Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull) case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => - (n: String, v: Any) => FilterApi.notEq( + (n: Array[String], v: Any) => FilterApi.notEq( binaryColumn(n), Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length)).orNull) } - private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { + private val makeLt: + PartialFunction[ParquetSchemaType, (Array[String], Any) => FilterPredicate] = { case ParquetByteType | ParquetShortType | ParquetIntegerType => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.lt(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer]) case ParquetLongType => - (n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[JLong]) case ParquetFloatType => - (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[JFloat]) + (n: Array[String], v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[JFloat]) case ParquetDoubleType => - (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[JDouble]) + (n: Array[String], v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[JDouble]) case ParquetStringType => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.lt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String])) case ParquetBinaryType => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) case ParquetDateType if pushDownDate => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.lt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer]) case ParquetTimestampMicrosType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.lt( + (n: Array[String], v: Any) => FilterApi.lt( longColumn(n), DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) case ParquetTimestampMillisType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.lt( + (n: Array[String], v: Any) => FilterApi.lt( longColumn(n), v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.lt(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.lt(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } - private val makeLtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { + private val makeLtEq: + PartialFunction[ParquetSchemaType, (Array[String], Any) => FilterPredicate] = { case ParquetByteType | ParquetShortType | ParquetIntegerType => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer]) case ParquetLongType => - (n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[JLong]) case ParquetFloatType => - (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[JFloat]) + (n: Array[String], v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[JFloat]) case ParquetDoubleType => - (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[JDouble]) + (n: Array[String], v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[JDouble]) case ParquetStringType => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.ltEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String])) case ParquetBinaryType => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) case ParquetDateType if pushDownDate => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.ltEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer]) case ParquetTimestampMicrosType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.ltEq( + (n: Array[String], v: Any) => FilterApi.ltEq( longColumn(n), DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) case ParquetTimestampMillisType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.ltEq( + (n: Array[String], v: Any) => FilterApi.ltEq( longColumn(n), v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.ltEq(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.ltEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } - private val makeGt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { + private val makeGt: + PartialFunction[ParquetSchemaType, (Array[String], Any) => FilterPredicate] = { case ParquetByteType | ParquetShortType | ParquetIntegerType => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer]) case ParquetLongType => - (n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[JLong]) case ParquetFloatType => - (n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[JFloat]) + (n: Array[String], v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[JFloat]) case ParquetDoubleType => - (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[JDouble]) + (n: Array[String], v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[JDouble]) case ParquetStringType => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.gt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String])) case ParquetBinaryType => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) case ParquetDateType if pushDownDate => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.gt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer]) case ParquetTimestampMicrosType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.gt( + (n: Array[String], v: Any) => FilterApi.gt( longColumn(n), DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) case ParquetTimestampMillisType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.gt( + (n: Array[String], v: Any) => FilterApi.gt( longColumn(n), v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.gt(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.gt(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } - private val makeGtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { + private val makeGtEq: + PartialFunction[ParquetSchemaType, (Array[String], Any) => FilterPredicate] = { case ParquetByteType | ParquetShortType | ParquetIntegerType => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[Number].intValue.asInstanceOf[Integer]) case ParquetLongType => - (n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[JLong]) case ParquetFloatType => - (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[JFloat]) + (n: Array[String], v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[JFloat]) case ParquetDoubleType => - (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[JDouble]) + (n: Array[String], v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[JDouble]) case ParquetStringType => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.gtEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String])) case ParquetBinaryType => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) case ParquetDateType if pushDownDate => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.gtEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer]) case ParquetTimestampMicrosType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.gtEq( + (n: Array[String], v: Any) => FilterApi.gtEq( longColumn(n), DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) case ParquetTimestampMillisType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.gtEq( + (n: Array[String], v: Any) => FilterApi.gtEq( longColumn(n), v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.gtEq(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => - (n: String, v: Any) => + (n: Array[String], v: Any) => FilterApi.gtEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -503,38 +509,38 @@ class ParquetFilters( predicate match { case sources.IsNull(name) if canMakeFilterOn(name, null) => makeEq.lift(nameToParquetField(name).fieldType) - .map(_(nameToParquetField(name).fieldName, null)) + .map(_(Array(nameToParquetField(name).fieldName), null)) case sources.IsNotNull(name) if canMakeFilterOn(name, null) => makeNotEq.lift(nameToParquetField(name).fieldType) - .map(_(nameToParquetField(name).fieldName, null)) + .map(_(Array(nameToParquetField(name).fieldName), null)) case sources.EqualTo(name, value) if canMakeFilterOn(name, value) => makeEq.lift(nameToParquetField(name).fieldType) - .map(_(nameToParquetField(name).fieldName, value)) + .map(_(Array(nameToParquetField(name).fieldName), value)) case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, value) => makeNotEq.lift(nameToParquetField(name).fieldType) - .map(_(nameToParquetField(name).fieldName, value)) + .map(_(Array(nameToParquetField(name).fieldName), value)) case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) => makeEq.lift(nameToParquetField(name).fieldType) - .map(_(nameToParquetField(name).fieldName, value)) + .map(_(Array(nameToParquetField(name).fieldName), value)) case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value) => makeNotEq.lift(nameToParquetField(name).fieldType) - .map(_(nameToParquetField(name).fieldName, value)) + .map(_(Array(nameToParquetField(name).fieldName), value)) case sources.LessThan(name, value) if canMakeFilterOn(name, value) => makeLt.lift(nameToParquetField(name).fieldType) - .map(_(nameToParquetField(name).fieldName, value)) + .map(_(Array(nameToParquetField(name).fieldName), value)) case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name, value) => makeLtEq.lift(nameToParquetField(name).fieldType) - .map(_(nameToParquetField(name).fieldName, value)) + .map(_(Array(nameToParquetField(name).fieldName), value)) case sources.GreaterThan(name, value) if canMakeFilterOn(name, value) => makeGt.lift(nameToParquetField(name).fieldType) - .map(_(nameToParquetField(name).fieldName, value)) + .map(_(Array(nameToParquetField(name).fieldName), value)) case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name, value) => makeGtEq.lift(nameToParquetField(name).fieldType) - .map(_(nameToParquetField(name).fieldName, value)) + .map(_(Array(nameToParquetField(name).fieldName), value)) case sources.And(lhs, rhs) => // At here, it is not safe to just convert one side and remove the other side @@ -585,13 +591,13 @@ class ParquetFilters( && values.distinct.length <= pushDownInFilterThreshold => values.distinct.flatMap { v => makeEq.lift(nameToParquetField(name).fieldType) - .map(_(nameToParquetField(name).fieldName, v)) + .map(_(Array(nameToParquetField(name).fieldName), v)) }.reduceLeftOption(FilterApi.or) case sources.StringStartsWith(name, prefix) if pushDownStartWith && canMakeFilterOn(name, prefix) => Option(prefix).map { v => - FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldName), + FilterApi.userDefined(binaryColumn(Array(nameToParquetField(name).fieldName)), new UserDefinedPredicate[Binary] with Serializable { private val strToBinary = Binary.fromReusedByteArray(v.getBytes) private val size = strToBinary.length --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org