This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a4fa3451916 [HUDI-7033] Fix read error for schema evolution +
partition value extraction (#9994)
a4fa3451916 is described below
commit a4fa3451916de11dc082792076b62013586dadaf
Author: voonhous <[email protected]>
AuthorDate: Wed Nov 8 10:49:48 2023 +0800
[HUDI-7033] Fix read error for schema evolution + partition value
extraction (#9994)
---
.../org/apache/hudi/HoodieDataSourceHelper.scala | 61 +++++++++++++++++++++-
.../apache/hudi/TestHoodieDataSourceHelper.scala | 54 +++++++++++++++++++
.../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 41 +++++++++++++++
3 files changed, 154 insertions(+), 2 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
index eb8ddfdf870..4add21b5b8d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.sources.{And, Filter, Or}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -58,7 +58,7 @@ object HoodieDataSourceHelper extends PredicateHelper with
SparkAdapterSupport {
dataSchema = dataSchema,
partitionSchema = partitionSchema,
requiredSchema = requiredSchema,
- filters = filters,
+ filters = if (appendPartitionValues) getNonPartitionFilters(filters,
dataSchema, partitionSchema) else filters,
options = options,
hadoopConf = hadoopConf
)
@@ -98,4 +98,61 @@ object HoodieDataSourceHelper extends PredicateHelper with
SparkAdapterSupport {
deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow]
}
}
+
+ def getNonPartitionFilters(filters: Seq[Filter], dataSchema: StructType,
partitionSchema: StructType): Seq[Filter] = {
+ filters.flatMap(f => {
+ if (f.references.intersect(partitionSchema.fields.map(_.name)).nonEmpty)
{
+ extractPredicatesWithinOutputSet(f, dataSchema.fieldNames.toSet)
+ } else {
+ Some(f)
+ }
+ })
+ }
+
+ /**
+ * Heavily adapted from {@see
org.apache.spark.sql.catalyst.expressions.PredicateHelper#extractPredicatesWithinOutputSet}
+ * Method is adapted to work with Filters instead of Expressions
+ *
+ * @return
+ */
+ def extractPredicatesWithinOutputSet(condition: Filter,
+ outputSet: Set[String]): Option[Filter]
= condition match {
+ case And(left, right) =>
+ val leftResultOptional = extractPredicatesWithinOutputSet(left,
outputSet)
+ val rightResultOptional = extractPredicatesWithinOutputSet(right,
outputSet)
+ (leftResultOptional, rightResultOptional) match {
+ case (Some(leftResult), Some(rightResult)) => Some(And(leftResult,
rightResult))
+ case (Some(leftResult), None) => Some(leftResult)
+ case (None, Some(rightResult)) => Some(rightResult)
+ case _ => None
+ }
+
+ // The Or predicate is convertible when both of its children can be pushed
down.
+ // That is to say, if one/both of the children can be partially pushed
down, the Or
+ // predicate can be partially pushed down as well.
+ //
+ // Here is an example used to explain the reason.
+ // Let's say we have
+ // condition: (a1 AND a2) OR (b1 AND b2),
+ // outputSet: AttributeSet(a1, b1)
+ // a1 and b1 is convertible, while a2 and b2 is not.
+ // The predicate can be converted as
+ // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
+ // As per the logical in And predicate, we can push down (a1 OR b1).
+ case Or(left, right) =>
+ for {
+ lhs <- extractPredicatesWithinOutputSet(left, outputSet)
+ rhs <- extractPredicatesWithinOutputSet(right, outputSet)
+ } yield Or(lhs, rhs)
+
+ // Here we assume all the `Not` operators is already below all the `And`
and `Or` operators
+ // after the optimization rule `BooleanSimplification`, so that we don't
need to handle the
+ // `Not` operators here.
+ case other =>
+ if (other.references.toSet.subsetOf(outputSet)) {
+ Some(other)
+ } else {
+ None
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieDataSourceHelper.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieDataSourceHelper.scala
new file mode 100644
index 00000000000..7f660136a30
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieDataSourceHelper.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.sql.functions.expr
+import org.apache.spark.sql.sources.Filter
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+class TestHoodieDataSourceHelper extends SparkAdapterSupport {
+
+ def checkCondition(filter: Option[Filter], outputSet: Set[String], expected:
Any): Unit = {
+ val actual =
HoodieDataSourceHelper.extractPredicatesWithinOutputSet(filter.get, outputSet)
+ assertEquals(expected, actual)
+ }
+
+ @Test
+ def testExtractPredicatesWithinOutputSet() : Unit = {
+ val dataColsWithNoPartitionCols = Set("id", "extra_col")
+
+ val expr1 = sparkAdapter.translateFilter(expr("(region='reg2' and id = 1)
or region='reg1'").expr)
+ checkCondition(expr1, dataColsWithNoPartitionCols, None)
+
+ val expr2 = sparkAdapter.translateFilter(expr("region='reg2' and id =
1").expr)
+ val expectedExpr2 = sparkAdapter.translateFilter(expr("id = 1").expr)
+ checkCondition(expr2, dataColsWithNoPartitionCols, expectedExpr2)
+
+ // not (region='reg2' and id = 1) -- BooleanSimplification --> not
region='reg2' or not id = 1
+ val expr3 = sparkAdapter.translateFilter(expr("not region='reg2' or not id
= 1").expr)
+ checkCondition(expr3, dataColsWithNoPartitionCols, None)
+
+ // not (region='reg2' or id = 1) -- BooleanSimplification --> not
region='reg2' and not id = 1
+ val expr4 = sparkAdapter.translateFilter(expr("not region='reg2' and not
id = 1").expr)
+ val expectedExpr4 = sparkAdapter.translateFilter(expr("not(id=1)").expr)
+ checkCondition(expr4, dataColsWithNoPartitionCols, expectedExpr4)
+ }
+
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 137efba2861..6ca1a72edcd 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -1015,4 +1015,45 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
}
}
+
+ test("Test extract partition values from path when schema evolution is
enabled") {
+ withTable(generateTableName) { tableName =>
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | ts bigint,
+ | region string,
+ | dt date
+ |) using hudi
+ |tblproperties (
+ | primaryKey = 'id',
+ | type = 'cow',
+ | preCombineField = 'ts'
+ |)
+ |partitioned by (region, dt)""".stripMargin)
+
+ withSQLConf("hoodie.datasource.read.extract.partition.values.from.path"
-> "true",
+ "hoodie.schema.on.read.enable" -> "true") {
+ spark.sql(s"insert into $tableName partition (region='reg1',
dt='2023-10-01') " +
+ s"select 1, 'name1', 1000")
+ checkAnswer(s"select id, name, ts, region, cast(dt as string) from
$tableName where region='reg1'")(
+ Seq(1, "name1", 1000, "reg1", "2023-10-01")
+ )
+
+ // apply schema evolution and perform a read again
+ spark.sql(s"alter table $tableName add columns(price double)")
+ checkAnswer(s"select id, name, ts, region, cast(dt as string) from
$tableName where region='reg1'")(
+ Seq(1, "name1", 1000, "reg1", "2023-10-01")
+ )
+
+ // ensure this won't be broken in the future
+ // BooleanSimplification is always applied when calling
HoodieDataSourceHelper#getNonPartitionFilters
+ checkAnswer(s"select id, name, ts, region, cast(dt as string) from
$tableName where not(region='reg2' or id=2)")(
+ Seq(1, "name1", 1000, "reg1", "2023-10-01")
+ )
+ }
+ }
+ }
}