This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7b6e1d5 [SPARK-25557][SQL] Nested column predicate pushdown for ORC
7b6e1d5 is described below
commit 7b6e1d5cec60dbb43c6fbc1528a207cbff604a5d
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Aug 7 08:07:41 2020 -0700
[SPARK-25557][SQL] Nested column predicate pushdown for ORC
### What changes were proposed in this pull request?
We added nested column predicate pushdown for Parquet in #27728. This patch
extends the feature support to ORC.
### Why are the changes needed?
Extending the feature to ORC for feature parity. Better performance for
handling nested predicate pushdown.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests.
Closes #28761 from viirya/SPARK-25557.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 6 +-
.../execution/datasources/DataSourceStrategy.scala | 2 +
.../execution/datasources/orc/OrcFiltersBase.scala | 50 +++-
.../datasources/v2/orc/OrcScanBuilder.scala | 7 +-
.../datasources/FileBasedDataSourceTest.scala | 40 ++-
.../sql/execution/datasources/orc/OrcTest.scala | 22 ++
.../datasources/parquet/ParquetFilterSuite.scala | 29 +--
.../sql/execution/datasources/orc/OrcFilters.scala | 28 +-
.../execution/datasources/orc/OrcFilterSuite.scala | 276 +++++++++++---------
.../sql/execution/datasources/orc/OrcFilters.scala | 28 +-
.../execution/datasources/orc/OrcFilterSuite.scala | 282 ++++++++++++---------
11 files changed, 460 insertions(+), 310 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a69f09b..196971a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2108,9 +2108,9 @@ object SQLConf {
.doc("A comma-separated list of data source short names or fully
qualified data source " +
"implementation class names for which Spark tries to push down
predicates for nested " +
"columns and/or names containing `dots` to data sources. This
configuration is only " +
- "effective with file-based data source in DSv1. Currently, Parquet
implements " +
- "both optimizations while ORC only supports predicates for names
containing `dots`. The " +
- "other data sources don't support this feature yet. So the default
value is 'parquet,orc'.")
+ "effective with file-based data sources in DSv1. Currently, Parquet
and ORC implement " +
+ "both optimizations. The other data sources don't support this feature
yet. So the " +
+ "default value is 'parquet,orc'.")
.version("3.0.0")
.stringConf
.createWithDefault("parquet,orc")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 23454d7..ada04c2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -668,6 +668,8 @@ abstract class PushableColumnBase {
import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
def helper(e: Expression): Option[Seq[String]] = e match {
case a: Attribute =>
+ // Attribute that contains dot "." in name is supported only when
+ // nested predicate pushdown is enabled.
if (nestedPredicatePushdownEnabled || !a.name.contains(".")) {
Some(Seq(a.name))
} else {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
index e673309..b277b4d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
@@ -17,8 +17,11 @@
package org.apache.spark.sql.execution.datasources.orc
+import java.util.Locale
+
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.sources.{And, Filter}
-import org.apache.spark.sql.types.{AtomicType, BinaryType, DataType}
+import org.apache.spark.sql.types.{AtomicType, BinaryType, DataType,
StructField, StructType}
/**
* Methods that can be shared when upgrading the built-in Hive.
@@ -37,12 +40,45 @@ trait OrcFiltersBase {
}
/**
- * Return true if this is a searchable type in ORC.
- * Both CharType and VarcharType are cleaned at AstBuilder.
+ * This method returns a map which contains ORC field name and data type.
Each key
+ * represents a column; `dots` are used as separators for nested columns. If
any part
+ * of the names contains `dots`, it is quoted to avoid confusion. See
+ * `org.apache.spark.sql.connector.catalog.quoted` for implementation
details.
+ *
+ * BinaryType, UserDefinedType, ArrayType and MapType are ignored.
*/
- protected[sql] def isSearchableType(dataType: DataType) = dataType match {
- case BinaryType => false
- case _: AtomicType => true
- case _ => false
+ protected[sql] def getSearchableTypeMap(
+ schema: StructType,
+ caseSensitive: Boolean): Map[String, DataType] = {
+ import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+
+ def getPrimitiveFields(
+ fields: Seq[StructField],
+ parentFieldNames: Seq[String] = Seq.empty): Seq[(String, DataType)] = {
+ fields.flatMap { f =>
+ f.dataType match {
+ case st: StructType =>
+ getPrimitiveFields(st.fields, parentFieldNames :+ f.name)
+ case BinaryType => None
+ case _: AtomicType =>
+ Some(((parentFieldNames :+ f.name).quoted, f.dataType))
+ case _ => None
+ }
+ }
+ }
+
+ val primitiveFields = getPrimitiveFields(schema.fields)
+ if (caseSensitive) {
+ primitiveFields.toMap
+ } else {
+ // Don't consider ambiguity here, i.e. more than one field are matched
in case insensitive
+ // mode, just skip pushdown for these fields, they will trigger
Exception when reading,
+ // See: SPARK-25175.
+ val dedupPrimitiveFields = primitiveFields
+ .groupBy(_._1.toLowerCase(Locale.ROOT))
+ .filter(_._2.size == 1)
+ .mapValues(_.head._2)
+ CaseInsensitiveMap(dedupPrimitiveFields)
+ }
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
index 9f40f5f..0330dac 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.read.{Scan,
SupportsPushDownFilters}
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.orc.OrcFilters
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -60,10 +61,8 @@ case class OrcScanBuilder(
// changed `hadoopConf` in executors.
OrcInputFormat.setSearchArgument(hadoopConf, f, schema.fieldNames)
}
- val dataTypeMap = schema.map(f => quoteIfNeeded(f.name) ->
f.dataType).toMap
- // TODO (SPARK-25557): ORC doesn't support nested predicate pushdown, so
they are removed.
- val newFilters = filters.filter(!_.containsNestedColumn)
- _pushedFilters = OrcFilters.convertibleFilters(schema, dataTypeMap,
newFilters).toArray
+ val dataTypeMap = OrcFilters.getSearchableTypeMap(schema,
SQLConf.get.caseSensitiveAnalysis)
+ _pushedFilters = OrcFilters.convertibleFilters(schema, dataTypeMap,
filters).toArray
}
filters
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala
index bdb161d..c2dc20b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala
@@ -22,8 +22,10 @@ import java.io.File
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
-import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.apache.spark.sql.{DataFrame, Row, SaveMode}
+import org.apache.spark.sql.functions.struct
import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.StructType
/**
* A helper trait that provides convenient facilities for file-based data
source testing.
@@ -103,4 +105,40 @@ private[sql] trait FileBasedDataSourceTest extends
SQLTestUtils {
df: DataFrame, path: File): Unit = {
df.write.mode(SaveMode.Overwrite).format(dataSourceName).save(path.getCanonicalPath)
}
+
+ /**
+ * Takes single level `inputDF` dataframe to generate multi-level nested
+ * dataframes as new test data. It tests both non-nested and nested
dataframes
+ * which are written and read back with specified datasource.
+ */
+ protected def withNestedDataFrame(inputDF: DataFrame): Seq[(DataFrame,
String, Any => Any)] = {
+ assert(inputDF.schema.fields.length == 1)
+ assert(!inputDF.schema.fields.head.dataType.isInstanceOf[StructType])
+ val df = inputDF.toDF("temp")
+ Seq(
+ (
+ df.withColumnRenamed("temp", "a"),
+ "a", // zero nesting
+ (x: Any) => x),
+ (
+ df.withColumn("a", struct(df("temp") as "b")).drop("temp"),
+ "a.b", // one level nesting
+ (x: Any) => Row(x)),
+ (
+ df.withColumn("a", struct(struct(df("temp") as "c") as
"b")).drop("temp"),
+ "a.b.c", // two level nesting
+ (x: Any) => Row(Row(x))
+ ),
+ (
+ df.withColumnRenamed("temp", "a.b"),
+ "`a.b`", // zero nesting with column name containing `dots`
+ (x: Any) => x
+ ),
+ (
+ df.withColumn("a.b", struct(df("temp") as "c.d") ).drop("temp"),
+ "`a.b`.`c.d`", // one level nesting with column names containing `dots`
+ (x: Any) => Row(x)
+ )
+ )
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
index e929f90..aec61ac 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
@@ -143,4 +143,26 @@ abstract class OrcTest extends QueryTest with
FileBasedDataSourceTest with Befor
FileUtils.copyURLToFile(url, file)
spark.read.orc(file.getAbsolutePath)
}
+
+ /**
+ * Takes a sequence of products `data` to generate multi-level nested
+ * dataframes as new test data. It tests both non-nested and nested
dataframes
+ * which are written and read back with Orc datasource.
+ *
+ * This is different from [[withOrcDataFrame]] which does not
+ * test nested cases.
+ */
+ protected def withNestedOrcDataFrame[T <: Product: ClassTag: TypeTag](data:
Seq[T])
+ (runTest: (DataFrame, String, Any => Any) => Unit): Unit =
+ withNestedOrcDataFrame(spark.createDataFrame(data))(runTest)
+
+ protected def withNestedOrcDataFrame(inputDF: DataFrame)
+ (runTest: (DataFrame, String, Any => Any) => Unit): Unit = {
+ withNestedDataFrame(inputDF).foreach { case (newDF, colName, resultFun) =>
+ withTempPath { file =>
+ newDF.write.format(dataSourceName).save(file.getCanonicalPath)
+ readFile(file.getCanonicalPath, true) { df => runTest(df, colName,
resultFun) }
+ }
+ }
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 8b922aa..5689b9d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -122,34 +122,7 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
private def withNestedParquetDataFrame(inputDF: DataFrame)
(runTest: (DataFrame, String, Any => Any) => Unit): Unit = {
- assert(inputDF.schema.fields.length == 1)
- assert(!inputDF.schema.fields.head.dataType.isInstanceOf[StructType])
- val df = inputDF.toDF("temp")
- Seq(
- (
- df.withColumnRenamed("temp", "a"),
- "a", // zero nesting
- (x: Any) => x),
- (
- df.withColumn("a", struct(df("temp") as "b")).drop("temp"),
- "a.b", // one level nesting
- (x: Any) => Row(x)),
- (
- df.withColumn("a", struct(struct(df("temp") as "c") as
"b")).drop("temp"),
- "a.b.c", // two level nesting
- (x: Any) => Row(Row(x))
- ),
- (
- df.withColumnRenamed("temp", "a.b"),
- "`a.b`", // zero nesting with column name containing `dots`
- (x: Any) => x
- ),
- (
- df.withColumn("a.b", struct(df("temp") as "c.d") ).drop("temp"),
- "`a.b`.`c.d`", // one level nesting with column names containing `dots`
- (x: Any) => Row(x)
- )
- ).foreach { case (newDF, colName, resultFun) =>
+ withNestedDataFrame(inputDF).foreach { case (newDF, colName, resultFun) =>
withTempPath { file =>
newDF.write.format(dataSourceName).save(file.getCanonicalPath)
readParquetFile(file.getCanonicalPath) { df => runTest(df, colName,
resultFun) }
diff --git
a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index b685639..bc11bb8 100644
---
a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++
b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -27,7 +27,7 @@ import org.apache.orc.storage.serde2.io.HiveDecimalWritable
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros,
localDateToDays, toJavaDate, toJavaTimestamp}
-import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
@@ -68,11 +68,9 @@ private[sql] object OrcFilters extends OrcFiltersBase {
* Create ORC filter as a SearchArgument instance.
*/
def createFilter(schema: StructType, filters: Seq[Filter]):
Option[SearchArgument] = {
- val dataTypeMap = schema.map(f => quoteIfNeeded(f.name) ->
f.dataType).toMap
+ val dataTypeMap = OrcFilters.getSearchableTypeMap(schema,
SQLConf.get.caseSensitiveAnalysis)
// Combines all convertible filters using `And` to produce a single
conjunction
- // TODO (SPARK-25557): ORC doesn't support nested predicate pushdown, so
they are removed.
- val newFilters = filters.filter(!_.containsNestedColumn)
- val conjunctionOptional = buildTree(convertibleFilters(schema,
dataTypeMap, newFilters))
+ val conjunctionOptional = buildTree(convertibleFilters(schema,
dataTypeMap, filters))
conjunctionOptional.map { conjunction =>
// Then tries to build a single ORC `SearchArgument` for the conjunction
predicate.
// The input predicate is fully convertible. There should not be any
empty result in the
@@ -228,40 +226,38 @@ private[sql] object OrcFilters extends OrcFiltersBase {
// NOTE: For all case branches dealing with leaf predicates below, the
additional `startAnd()`
// call is mandatory. ORC `SearchArgument` builder requires that all leaf
predicates must be
// wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
- // Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.`
characters
- // in order to distinguish predicate pushdown for nested columns.
expression match {
- case EqualTo(name, value) if isSearchableType(dataTypeMap(name)) =>
+ case EqualTo(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().equals(name, getType(name), castedValue).end())
- case EqualNullSafe(name, value) if isSearchableType(dataTypeMap(name)) =>
+ case EqualNullSafe(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().nullSafeEquals(name, getType(name),
castedValue).end())
- case LessThan(name, value) if isSearchableType(dataTypeMap(name)) =>
+ case LessThan(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().lessThan(name, getType(name),
castedValue).end())
- case LessThanOrEqual(name, value) if isSearchableType(dataTypeMap(name))
=>
+ case LessThanOrEqual(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().lessThanEquals(name, getType(name),
castedValue).end())
- case GreaterThan(name, value) if isSearchableType(dataTypeMap(name)) =>
+ case GreaterThan(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startNot().lessThanEquals(name, getType(name),
castedValue).end())
- case GreaterThanOrEqual(name, value) if
isSearchableType(dataTypeMap(name)) =>
+ case GreaterThanOrEqual(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startNot().lessThan(name, getType(name),
castedValue).end())
- case IsNull(name) if isSearchableType(dataTypeMap(name)) =>
+ case IsNull(name) if dataTypeMap.contains(name) =>
Some(builder.startAnd().isNull(name, getType(name)).end())
- case IsNotNull(name) if isSearchableType(dataTypeMap(name)) =>
+ case IsNotNull(name) if dataTypeMap.contains(name) =>
Some(builder.startNot().isNull(name, getType(name)).end())
- case In(name, values) if isSearchableType(dataTypeMap(name)) =>
+ case In(name, values) if dataTypeMap.contains(name) =>
val castedValues = values.map(v => castLiteralValue(v,
dataTypeMap(name)))
Some(builder.startAnd().in(name, getType(name),
castedValues.map(_.asInstanceOf[AnyRef]): _*).end())
diff --git
a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index 88b4b24..2643196 100644
---
a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++
b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -92,155 +92,199 @@ class OrcFilterSuite extends OrcTest with
SharedSparkSession {
}
test("filter pushdown - integer") {
- withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
- checkFilterPredicate($"_1" === 1, PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
- checkFilterPredicate($"_1" < 2, PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= 4, PredicateLeaf.Operator.LESS_THAN)
-
- checkFilterPredicate(Literal(1) === $"_1", PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(Literal(1) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal(2) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal(3) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(1) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(4) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ withNestedOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { case
(inputDF, colName, _) =>
+ implicit val df: DataFrame = inputDF
+
+ val intAttr = df(colName).expr
+ assert(df(colName).expr.dataType === IntegerType)
+
+ checkFilterPredicate(intAttr.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate(intAttr === 1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(intAttr <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate(intAttr < 2, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(intAttr > 3,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(intAttr <= 1,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(intAttr >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(1) === intAttr,
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal(1) <=> intAttr,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(2) > intAttr,
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal(3) < intAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(1) >= intAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(4) <= intAttr,
PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - long") {
- withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit
df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
- checkFilterPredicate($"_1" === 1, PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
- checkFilterPredicate($"_1" < 2, PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= 4, PredicateLeaf.Operator.LESS_THAN)
-
- checkFilterPredicate(Literal(1) === $"_1", PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(Literal(1) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal(2) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal(3) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(1) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(4) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ withNestedOrcDataFrame(
+ (1 to 4).map(i => Tuple1(Option(i.toLong)))) { case (inputDF, colName,
_) =>
+ implicit val df: DataFrame = inputDF
+
+ val longAttr = df(colName).expr
+ assert(df(colName).expr.dataType === LongType)
+
+ checkFilterPredicate(longAttr.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate(longAttr === 1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(longAttr <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate(longAttr < 2, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(longAttr > 3,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(longAttr <= 1,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(longAttr >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(1) === longAttr,
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal(1) <=> longAttr,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(2) > longAttr,
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal(3) < longAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(1) >= longAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(4) <= longAttr,
PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - float") {
- withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit
df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
- checkFilterPredicate($"_1" === 1, PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
- checkFilterPredicate($"_1" < 2, PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= 4, PredicateLeaf.Operator.LESS_THAN)
-
- checkFilterPredicate(Literal(1) === $"_1", PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(Literal(1) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal(2) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal(3) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(1) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(4) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ withNestedOrcDataFrame(
+ (1 to 4).map(i => Tuple1(Option(i.toFloat)))) { case (inputDF,
colName, _) =>
+ implicit val df: DataFrame = inputDF
+
+ val floatAttr = df(colName).expr
+ assert(df(colName).expr.dataType === FloatType)
+
+ checkFilterPredicate(floatAttr.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate(floatAttr === 1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(floatAttr <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate(floatAttr < 2, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(floatAttr > 3,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(floatAttr <= 1,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(floatAttr >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(1) === floatAttr,
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal(1) <=> floatAttr,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(2) > floatAttr,
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal(3) < floatAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(1) >= floatAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(4) <= floatAttr,
PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - double") {
- withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit
df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
- checkFilterPredicate($"_1" === 1, PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
- checkFilterPredicate($"_1" < 2, PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= 4, PredicateLeaf.Operator.LESS_THAN)
-
- checkFilterPredicate(Literal(1) === $"_1", PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(Literal(1) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal(2) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal(3) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(1) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(4) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ withNestedOrcDataFrame(
+ (1 to 4).map(i => Tuple1(Option(i.toDouble)))) { case (inputDF,
colName, _) =>
+ implicit val df: DataFrame = inputDF
+
+ val doubleAttr = df(colName).expr
+ assert(df(colName).expr.dataType === DoubleType)
+
+ checkFilterPredicate(doubleAttr.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate(doubleAttr === 1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(doubleAttr <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate(doubleAttr < 2, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(doubleAttr > 3,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(doubleAttr <= 1,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(doubleAttr >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(1) === doubleAttr,
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal(1) <=> doubleAttr,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(2) > doubleAttr,
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal(3) < doubleAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(1) >= doubleAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(4) <= doubleAttr,
PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - string") {
- withOrcDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
- checkFilterPredicate($"_1" === "1", PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> "1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
- checkFilterPredicate($"_1" < "2", PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > "3",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= "1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= "4", PredicateLeaf.Operator.LESS_THAN)
-
- checkFilterPredicate(Literal("1") === $"_1",
PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(Literal("1") <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal("2") > $"_1",
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal("3") < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal("1") >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal("4") <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ withNestedOrcDataFrame((1 to 4).map(i => Tuple1(i.toString))) { case
(inputDF, colName, _) =>
+ implicit val df: DataFrame = inputDF
+
+ val strAttr = df(colName).expr
+ assert(df(colName).expr.dataType === StringType)
+
+ checkFilterPredicate(strAttr.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate(strAttr === "1", PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(strAttr <=> "1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate(strAttr < "2", PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(strAttr > "3",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(strAttr <= "1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(strAttr >= "4", PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal("1") === strAttr,
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal("1") <=> strAttr,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal("2") > strAttr,
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal("3") < strAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal("1") >= strAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal("4") <= strAttr,
PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - boolean") {
- withOrcDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b))))
{ implicit df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
- checkFilterPredicate($"_1" === true, PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> true,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
- checkFilterPredicate($"_1" < true, PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > false,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= false,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= false, PredicateLeaf.Operator.LESS_THAN)
-
- checkFilterPredicate(Literal(false) === $"_1",
PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(Literal(false) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal(false) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal(true) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(true) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(true) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ withNestedOrcDataFrame(
+ (true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { case
(inputDF, colName, _) =>
+ implicit val df: DataFrame = inputDF
+
+ val booleanAttr = df(colName).expr
+ assert(df(colName).expr.dataType === BooleanType)
+
+ checkFilterPredicate(booleanAttr.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate(booleanAttr === true, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(booleanAttr <=> true,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate(booleanAttr < true,
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(booleanAttr > false,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(booleanAttr <= false,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(booleanAttr >= false,
PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(false) === booleanAttr,
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal(false) <=> booleanAttr,
+ PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(false) > booleanAttr,
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal(true) < booleanAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(true) >= booleanAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(true) <= booleanAttr,
PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - decimal") {
- withOrcDataFrame((1 to 4).map(i => Tuple1.apply(BigDecimal.valueOf(i)))) {
implicit df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
+ withNestedOrcDataFrame(
+ (1 to 4).map(i => Tuple1.apply(BigDecimal.valueOf(i)))) { case
(inputDF, colName, _) =>
+ implicit val df: DataFrame = inputDF
+
+ val decimalAttr = df(colName).expr
+ assert(df(colName).expr.dataType === DecimalType(38, 18))
+
+ checkFilterPredicate(decimalAttr.isNull, PredicateLeaf.Operator.IS_NULL)
- checkFilterPredicate($"_1" === BigDecimal.valueOf(1),
PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> BigDecimal.valueOf(1),
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(decimalAttr === BigDecimal.valueOf(1),
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(decimalAttr <=> BigDecimal.valueOf(1),
+ PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate($"_1" < BigDecimal.valueOf(2),
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > BigDecimal.valueOf(3),
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= BigDecimal.valueOf(1),
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= BigDecimal.valueOf(4),
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(decimalAttr < BigDecimal.valueOf(2),
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(decimalAttr > BigDecimal.valueOf(3),
+ PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(decimalAttr <= BigDecimal.valueOf(1),
+ PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(decimalAttr >= BigDecimal.valueOf(4),
PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(
- Literal(BigDecimal.valueOf(1)) === $"_1",
PredicateLeaf.Operator.EQUALS)
+ Literal(BigDecimal.valueOf(1)) === decimalAttr,
PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(
- Literal(BigDecimal.valueOf(1)) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ Literal(BigDecimal.valueOf(1)) <=> decimalAttr,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(
- Literal(BigDecimal.valueOf(2)) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ Literal(BigDecimal.valueOf(2)) > decimalAttr,
PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(
- Literal(BigDecimal.valueOf(3)) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ Literal(BigDecimal.valueOf(3)) < decimalAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(
- Literal(BigDecimal.valueOf(1)) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ Literal(BigDecimal.valueOf(1)) >= decimalAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(
- Literal(BigDecimal.valueOf(4)) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ Literal(BigDecimal.valueOf(4)) <= decimalAttr,
PredicateLeaf.Operator.LESS_THAN)
}
}
diff --git
a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index 4b64208..5273245 100644
---
a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++
b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros,
localDateToDays, toJavaDate, toJavaTimestamp}
-import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
@@ -68,11 +68,9 @@ private[sql] object OrcFilters extends OrcFiltersBase {
* Create ORC filter as a SearchArgument instance.
*/
def createFilter(schema: StructType, filters: Seq[Filter]):
Option[SearchArgument] = {
- val dataTypeMap = schema.map(f => quoteIfNeeded(f.name) ->
f.dataType).toMap
+ val dataTypeMap = OrcFilters.getSearchableTypeMap(schema,
SQLConf.get.caseSensitiveAnalysis)
// Combines all convertible filters using `And` to produce a single
conjunction
- // TODO (SPARK-25557): ORC doesn't support nested predicate pushdown, so
they are removed.
- val newFilters = filters.filter(!_.containsNestedColumn)
- val conjunctionOptional = buildTree(convertibleFilters(schema,
dataTypeMap, newFilters))
+ val conjunctionOptional = buildTree(convertibleFilters(schema,
dataTypeMap, filters))
conjunctionOptional.map { conjunction =>
// Then tries to build a single ORC `SearchArgument` for the conjunction
predicate.
// The input predicate is fully convertible. There should not be any
empty result in the
@@ -228,40 +226,38 @@ private[sql] object OrcFilters extends OrcFiltersBase {
// NOTE: For all case branches dealing with leaf predicates below, the
additional `startAnd()`
// call is mandatory. ORC `SearchArgument` builder requires that all leaf
predicates must be
// wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
- // Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.`
characters
- // in order to distinguish predicate pushdown for nested columns.
expression match {
- case EqualTo(name, value) if isSearchableType(dataTypeMap(name)) =>
+ case EqualTo(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().equals(name, getType(name), castedValue).end())
- case EqualNullSafe(name, value) if isSearchableType(dataTypeMap(name)) =>
+ case EqualNullSafe(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().nullSafeEquals(name, getType(name),
castedValue).end())
- case LessThan(name, value) if isSearchableType(dataTypeMap(name)) =>
+ case LessThan(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().lessThan(name, getType(name),
castedValue).end())
- case LessThanOrEqual(name, value) if isSearchableType(dataTypeMap(name))
=>
+ case LessThanOrEqual(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().lessThanEquals(name, getType(name),
castedValue).end())
- case GreaterThan(name, value) if isSearchableType(dataTypeMap(name)) =>
+ case GreaterThan(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startNot().lessThanEquals(name, getType(name),
castedValue).end())
- case GreaterThanOrEqual(name, value) if
isSearchableType(dataTypeMap(name)) =>
+ case GreaterThanOrEqual(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startNot().lessThan(name, getType(name),
castedValue).end())
- case IsNull(name) if isSearchableType(dataTypeMap(name)) =>
+ case IsNull(name) if dataTypeMap.contains(name) =>
Some(builder.startAnd().isNull(name, getType(name)).end())
- case IsNotNull(name) if isSearchableType(dataTypeMap(name)) =>
+ case IsNotNull(name) if dataTypeMap.contains(name) =>
Some(builder.startNot().isNull(name, getType(name)).end())
- case In(name, values) if isSearchableType(dataTypeMap(name)) =>
+ case In(name, values) if dataTypeMap.contains(name) =>
val castedValues = values.map(v => castLiteralValue(v,
dataTypeMap(name)))
Some(builder.startAnd().in(name, getType(name),
castedValues.map(_.asInstanceOf[AnyRef]): _*).end())
diff --git
a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index 2263179..7df9f29 100644
---
a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++
b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -30,9 +30,8 @@ import org.apache.spark.sql.{AnalysisException, Column,
DataFrame}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.execution.datasources.{DataSourceStrategy,
HadoopFsRelation, LogicalRelation}
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation}
-import org.apache.spark.sql.execution.datasources.v2.orc.{OrcScan, OrcTable}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@@ -93,155 +92,200 @@ class OrcFilterSuite extends OrcTest with
SharedSparkSession {
}
test("filter pushdown - integer") {
- withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
- checkFilterPredicate($"_1" === 1, PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
- checkFilterPredicate($"_1" < 2, PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= 4, PredicateLeaf.Operator.LESS_THAN)
-
- checkFilterPredicate(Literal(1) === $"_1", PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(Literal(1) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal(2) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal(3) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(1) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(4) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ withNestedOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { case
(inputDF, colName, _) =>
+ implicit val df: DataFrame = inputDF
+
+ val intAttr = df(colName).expr
+ assert(df(colName).expr.dataType === IntegerType)
+
+ checkFilterPredicate(intAttr.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate(intAttr === 1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(intAttr <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate(intAttr < 2, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(intAttr > 3,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(intAttr <= 1,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(intAttr >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(1) === intAttr,
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal(1) <=> intAttr,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(2) > intAttr,
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal(3) < intAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(1) >= intAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(4) <= intAttr,
PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - long") {
- withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit
df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
- checkFilterPredicate($"_1" === 1, PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
- checkFilterPredicate($"_1" < 2, PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= 4, PredicateLeaf.Operator.LESS_THAN)
-
- checkFilterPredicate(Literal(1) === $"_1", PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(Literal(1) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal(2) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal(3) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(1) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(4) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ withNestedOrcDataFrame(
+ (1 to 4).map(i => Tuple1(Option(i.toLong)))) { case (inputDF, colName,
_) =>
+ implicit val df: DataFrame = inputDF
+
+ val longAttr = df(colName).expr
+ assert(df(colName).expr.dataType === LongType)
+
+ checkFilterPredicate(longAttr.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate(longAttr === 1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(longAttr <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate(longAttr < 2, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(longAttr > 3,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(longAttr <= 1,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(longAttr >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(1) === longAttr,
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal(1) <=> longAttr,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(2) > longAttr,
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal(3) < longAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(1) >= longAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(4) <= longAttr,
PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - float") {
- withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit
df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
- checkFilterPredicate($"_1" === 1, PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
- checkFilterPredicate($"_1" < 2, PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= 4, PredicateLeaf.Operator.LESS_THAN)
-
- checkFilterPredicate(Literal(1) === $"_1", PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(Literal(1) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal(2) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal(3) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(1) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(4) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ withNestedOrcDataFrame(
+ (1 to 4).map(i => Tuple1(Option(i.toFloat)))) { case (inputDF,
colName, _) =>
+ implicit val df: DataFrame = inputDF
+
+ val floatAttr = df(colName).expr
+ assert(df(colName).expr.dataType === FloatType)
+
+ checkFilterPredicate(floatAttr.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate(floatAttr === 1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(floatAttr <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate(floatAttr < 2, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(floatAttr > 3,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(floatAttr <= 1,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(floatAttr >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(1) === floatAttr,
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal(1) <=> floatAttr,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(2) > floatAttr,
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal(3) < floatAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(1) >= floatAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(4) <= floatAttr,
PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - double") {
- withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit
df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
- checkFilterPredicate($"_1" === 1, PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
- checkFilterPredicate($"_1" < 2, PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= 4, PredicateLeaf.Operator.LESS_THAN)
-
- checkFilterPredicate(Literal(1) === $"_1", PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(Literal(1) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal(2) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal(3) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(1) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(4) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ withNestedOrcDataFrame(
+ (1 to 4).map(i => Tuple1(Option(i.toDouble)))) { case (inputDF,
colName, _) =>
+ implicit val df: DataFrame = inputDF
+
+ val doubleAttr = df(colName).expr
+ assert(df(colName).expr.dataType === DoubleType)
+
+ checkFilterPredicate(doubleAttr.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate(doubleAttr === 1, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(doubleAttr <=> 1,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate(doubleAttr < 2, PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(doubleAttr > 3,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(doubleAttr <= 1,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(doubleAttr >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(1) === doubleAttr,
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal(1) <=> doubleAttr,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(2) > doubleAttr,
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal(3) < doubleAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(1) >= doubleAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(4) <= doubleAttr,
PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - string") {
- withOrcDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
- checkFilterPredicate($"_1" === "1", PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> "1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
- checkFilterPredicate($"_1" < "2", PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > "3",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= "1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= "4", PredicateLeaf.Operator.LESS_THAN)
-
- checkFilterPredicate(Literal("1") === $"_1",
PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(Literal("1") <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal("2") > $"_1",
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal("3") < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal("1") >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal("4") <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ withNestedOrcDataFrame(
+ (1 to 4).map(i => Tuple1(i.toString))) { case (inputDF, colName, _) =>
+ implicit val df: DataFrame = inputDF
+
+ val strAttr = df(colName).expr
+ assert(df(colName).expr.dataType === StringType)
+
+ checkFilterPredicate(strAttr.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate(strAttr === "1", PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(strAttr <=> "1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate(strAttr < "2", PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(strAttr > "3",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(strAttr <= "1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(strAttr >= "4", PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal("1") === strAttr,
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal("1") <=> strAttr,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal("2") > strAttr,
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal("3") < strAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal("1") >= strAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal("4") <= strAttr,
PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - boolean") {
- withOrcDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b))))
{ implicit df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
- checkFilterPredicate($"_1" === true, PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> true,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
- checkFilterPredicate($"_1" < true, PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > false,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= false,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= false, PredicateLeaf.Operator.LESS_THAN)
-
- checkFilterPredicate(Literal(false) === $"_1",
PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(Literal(false) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal(false) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal(true) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(true) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(true) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ withNestedOrcDataFrame(
+ (true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { case
(inputDF, colName, _) =>
+ implicit val df: DataFrame = inputDF
+
+ val booleanAttr = df(colName).expr
+ assert(df(colName).expr.dataType === BooleanType)
+
+ checkFilterPredicate(booleanAttr.isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate(booleanAttr === true, PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(booleanAttr <=> true,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate(booleanAttr < true,
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(booleanAttr > false,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(booleanAttr <= false,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(booleanAttr >= false,
PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(false) === booleanAttr,
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(Literal(false) <=> booleanAttr,
+ PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(false) > booleanAttr,
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(Literal(true) < booleanAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(true) >= booleanAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(true) <= booleanAttr,
PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - decimal") {
- withOrcDataFrame((1 to 4).map(i => Tuple1.apply(BigDecimal.valueOf(i)))) {
implicit df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
+ withNestedOrcDataFrame(
+ (1 to 4).map(i => Tuple1.apply(BigDecimal.valueOf(i)))) { case
(inputDF, colName, _) =>
+ implicit val df: DataFrame = inputDF
+
+ val decimalAttr = df(colName).expr
+ assert(df(colName).expr.dataType === DecimalType(38, 18))
+
+ checkFilterPredicate(decimalAttr.isNull, PredicateLeaf.Operator.IS_NULL)
- checkFilterPredicate($"_1" === BigDecimal.valueOf(1),
PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> BigDecimal.valueOf(1),
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(decimalAttr === BigDecimal.valueOf(1),
PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(decimalAttr <=> BigDecimal.valueOf(1),
+ PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate($"_1" < BigDecimal.valueOf(2),
PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > BigDecimal.valueOf(3),
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= BigDecimal.valueOf(1),
PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= BigDecimal.valueOf(4),
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(decimalAttr < BigDecimal.valueOf(2),
PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(decimalAttr > BigDecimal.valueOf(3),
+ PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(decimalAttr <= BigDecimal.valueOf(1),
+ PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(decimalAttr >= BigDecimal.valueOf(4),
PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(
- Literal(BigDecimal.valueOf(1)) === $"_1",
PredicateLeaf.Operator.EQUALS)
+ Literal(BigDecimal.valueOf(1)) === decimalAttr,
PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(
- Literal(BigDecimal.valueOf(1)) <=> $"_1",
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ Literal(BigDecimal.valueOf(1)) <=> decimalAttr,
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(
- Literal(BigDecimal.valueOf(2)) > $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ Literal(BigDecimal.valueOf(2)) > decimalAttr,
PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(
- Literal(BigDecimal.valueOf(3)) < $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ Literal(BigDecimal.valueOf(3)) < decimalAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(
- Literal(BigDecimal.valueOf(1)) >= $"_1",
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ Literal(BigDecimal.valueOf(1)) >= decimalAttr,
PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(
- Literal(BigDecimal.valueOf(4)) <= $"_1",
PredicateLeaf.Operator.LESS_THAN)
+ Literal(BigDecimal.valueOf(4)) <= decimalAttr,
PredicateLeaf.Operator.LESS_THAN)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]