This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 3fc2c8ed97012fa698b302bb036b25ff3259cdad Author: Alexey Kudinkin <[email protected]> AuthorDate: Thu Feb 2 21:39:40 2023 -0800 [HUDI-5691] Fixing `HoodiePruneFileSourcePartitions` to properly handle non-partitioned tables (#7833) This change addresses the issue of `HoodiePruneFileSourcePartition` rule not being applied to non-partitioned table resulting into their corresponding size being incorrectly estimated by Spark --- .../analysis/HoodiePruneFileSourcePartitions.scala | 2 +- .../TestHoodiePruneFileSourcePartitions.scala | 40 ++++++++++++++-------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala index 3b86777e16e..46cb931a59b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala @@ -41,7 +41,7 @@ case class HoodiePruneFileSourcePartitions(spark: SparkSession) extends Rule[Log override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { case op @ PhysicalOperation(projects, filters, lr @ LogicalRelation(HoodieRelationMatcher(fileIndex), _, _, _)) - if sparkAdapter.isHoodieTable(lr, spark) && fileIndex.partitionSchema.nonEmpty && !fileIndex.hasPredicatesPushedDown => + if sparkAdapter.isHoodieTable(lr, spark) && !fileIndex.hasPredicatesPushedDown => val deterministicFilters = filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)) val normalizedFilters = exprUtils.normalizeExprs(deterministicFilters, lr.output) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala index 06239697db9..aac2a4027a2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala @@ -54,8 +54,11 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal ) @ParameterizedTest - @CsvSource(value = Array("cow", "mor")) - def testPartitionFiltersPushDown(tableType: String): Unit = { + @CsvSource(value = Array( + "cow,true", "cow,false", + "mor,true", "mor,false" + )) + def testPartitionFiltersPushDown(tableType: String, partitioned: Boolean): Unit = { spark.sql( s""" |CREATE TABLE $tableName ( @@ -65,7 +68,7 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal | ts long, | partition string |) USING hudi - |PARTITIONED BY (partition) + |${if (partitioned) "PARTITIONED BY (partition)" else ""} |TBLPROPERTIES ( | type = '$tableType', | primaryKey = 'id', @@ -103,27 +106,37 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal // support (for partition-pruning) will only occur during execution phase, while file-listing // actually happens during analysis stage case "eager" => - assertEquals(1275, f.stats.sizeInBytes.longValue() / 1024) - assertEquals(1275, lr.stats.sizeInBytes.longValue() / 1024) + // NOTE: In case of partitioned table 3 files will be created, while in case of non-partitioned just 1 + if (partitioned) { + assertEquals(1275, f.stats.sizeInBytes.longValue() / 1024) + assertEquals(1275, lr.stats.sizeInBytes.longValue() / 1024) + } else { + // NOTE: We're adding 512 to make sure we always round to the next integer value + assertEquals(425, (f.stats.sizeInBytes.longValue() + 512) / 1024) + assertEquals(425, (lr.stats.sizeInBytes.longValue() + 512) / 1024) + } // Case #2: Lazy listing (default mode). // In case of lazy listing mode, Hudi will only list partitions matching partition-predicates that are // eagerly pushed down (w/ help of [[HoodiePruneFileSourcePartitions]]) avoiding the necessity to // list the whole table case "lazy" => - assertEquals(425, f.stats.sizeInBytes.longValue() / 1024) - assertEquals(425, lr.stats.sizeInBytes.longValue() / 1024) + // NOTE: We're adding 512 to make sure we always round to the next integer value + assertEquals(425, (f.stats.sizeInBytes.longValue() + 512) / 1024) + assertEquals(425, (lr.stats.sizeInBytes.longValue() + 512) / 1024) case _ => throw new UnsupportedOperationException() } - val executionPlan = df.queryExecution.executedPlan - val expectedPhysicalPlanPartitionFiltersClause = tableType match { - case "cow" => s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]" - case "mor" => s"PushedFilters: [IsNotNull(partition), EqualTo(partition,2021-01-05)]" - } + if (partitioned) { + val executionPlan = df.queryExecution.executedPlan + val expectedPhysicalPlanPartitionFiltersClause = tableType match { + case "cow" => s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]" + case "mor" => s"PushedFilters: [IsNotNull(partition), EqualTo(partition,2021-01-05)]" + } - Assertions.assertTrue(executionPlan.toString().contains(expectedPhysicalPlanPartitionFiltersClause)) + Assertions.assertTrue(executionPlan.toString().contains(expectedPhysicalPlanPartitionFiltersClause)) + } case _ => val failureHint = @@ -224,5 +237,4 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal } } - }
