This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1767ff5e7c [HUDI-4161] Make sure partition values are taken from 
partition path (#5699)
1767ff5e7c is described below

commit 1767ff5e7c7d7a06d0267380eb3b830392a3bf28
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Fri May 27 02:36:30 2022 -0700

    [HUDI-4161] Make sure partition values are taken from partition path (#5699)
---
 .../org/apache/hudi/BaseFileOnlyRelation.scala      | 10 ++++++++++
 .../scala/org/apache/hudi/HoodieBaseRelation.scala  |  4 ++--
 .../scala/org/apache/hudi/HoodieFileIndex.scala     |  5 +----
 .../apache/hudi/functional/TestCOWDataSource.scala  | 21 ++++++++++++++-------
 .../apache/hudi/functional/TestMORDataSource.scala  | 14 +++++++-------
 5 files changed, 34 insertions(+), 20 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index c57f46a7b6..4160c34b0c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -54,6 +54,16 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
 
   override type FileSplit = HoodieBaseFileSplit
 
+  // TODO(HUDI-3204) this is to override behavior (exclusively) for COW tables 
to always extract
+  //                 partition values from partition path
+  //                 For more details please check HUDI-4161
+  // NOTE: This override has to mirror semantic of whenever this Relation is 
converted into [[HadoopFsRelation]],
+  //       which is currently done for all cases, except when Schema Evolution 
is enabled
+  override protected val shouldExtractPartitionValuesFromPartitionPath: 
Boolean = {
+    val enableSchemaOnRead = !internalSchema.isEmptySchema
+    !enableSchemaOnRead
+  }
+
   override lazy val mandatoryFields: Seq[String] =
   // TODO reconcile, record's key shouldn't be mandatory for base-file only 
relation
     Seq(recordKeyField)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 4b7177f4d6..08f87816d7 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -171,7 +171,7 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
   protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
     // Controls whether partition columns (which are the source for the 
partition path values) should
     // be omitted from persistence in the data files. On the read path it 
affects whether partition values (values
-    // of partition columns) will be read from the data file ot extracted from 
partition path
+    // of partition columns) will be read from the data file or extracted from 
partition path
     val shouldOmitPartitionColumns = 
metaClient.getTableConfig.shouldDropPartitionColumns && 
partitionColumns.nonEmpty
     val shouldExtractPartitionValueFromPath =
       
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
@@ -419,7 +419,7 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
       }
     } catch {
       case NonFatal(e) =>
-        logWarning(s"Failed to get the right partition InternalRow for file : 
${file.toString}")
+        logWarning(s"Failed to get the right partition InternalRow for file: 
${file.toString}", e)
         InternalRow.empty
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 08d0d722b2..d73e3a5d3b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -108,9 +108,6 @@ case class HoodieFileIndex(spark: SparkSession,
    * @return list of PartitionDirectory containing partition to base files 
mapping
    */
   override def listFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[PartitionDirectory] = {
-    val convertedPartitionFilters =
-      HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, 
partitionFilters)
-
     // Look up candidate files names in the col-stats index, if all of the 
following conditions are true
     //    - Data-skipping is enabled
     //    - Col-Stats Index is present
@@ -144,7 +141,7 @@ case class HoodieFileIndex(spark: SparkSession,
       Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
     } else {
       // Prune the partition path by the partition filters
-      val prunedPartitions = 
prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, 
convertedPartitionFilters)
+      val prunedPartitions = 
prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, partitionFilters)
       var totalFileSize = 0
       var candidateFileSize = 0
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 088ec1faab..7c86da0c9e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -144,7 +144,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
   def testPrunePartitionForTimestampBasedKeyGenerator(): Unit = {
     val options = commonOpts ++ Map(
       "hoodie.compact.inline" -> "false",
-      DataSourceWriteOptions.TABLE_TYPE.key -> 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+      DataSourceWriteOptions.TABLE_TYPE.key -> 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
       DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
"org.apache.hudi.keygen.TimestampBasedKeyGenerator",
       Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING",
       Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd",
@@ -176,8 +176,11 @@ class TestCOWDataSource extends HoodieClientTestBase {
 
     // snapshot query
     val snapshotQueryRes = spark.read.format("hudi").load(basePath)
-    assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 20)
-    assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 30)
+    // TODO(HUDI-3204) we have to revert this to pre-existing behavior from 
0.10
+    //assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 
20)
+    //assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 
30)
+    assertEquals(snapshotQueryRes.where("partition = '2022/01/01'").count, 20)
+    assertEquals(snapshotQueryRes.where("partition = '2022/01/02'").count, 30)
 
     // incremental query
     val incrementalQueryRes = spark.read.format("hudi")
@@ -961,10 +964,14 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assert(firstDF.count() == 2)
 
     // data_date is the partition field. Persist to the parquet file using the 
origin values, and read it.
-    assertEquals(
-      Seq("2018-09-23", "2018-09-24"),
-      firstDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq
-    )
+    // TODO(HUDI-3204) we have to revert this to pre-existing behavior from 
0.10
+    val expectedValues = if (useGlobbing) {
+      Seq("2018-09-23", "2018-09-24")
+    } else {
+      Seq("2018/09/23", "2018/09/24")
+    }
+
+    assertEquals(expectedValues, 
firstDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq)
     assertEquals(
       Seq("2018/09/23", "2018/09/24"),
       
firstDF.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 96514603ef..f9f1443893 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -17,11 +17,10 @@
 
 package org.apache.hudi.functional
 
-import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, 
HoodieRecordPayload, HoodieTableType}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodieTableType}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
@@ -30,9 +29,8 @@ import org.apache.hudi.index.HoodieIndex.IndexType
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
 import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
-import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils, 
SparkDatasetMixin}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers, SparkDatasetMixin}
 import org.apache.log4j.LogManager
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.BooleanType
@@ -41,7 +39,6 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
 
-import java.util
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
@@ -864,8 +861,11 @@ class TestMORDataSource extends HoodieClientTestBase with 
SparkDatasetMixin {
     val readOptimizedQueryRes = spark.read.format("hudi")
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
       .load(basePath)
-    assertEquals(readOptimizedQueryRes.where("partition = 
'2022-01-01'").count, 50)
-    assertEquals(readOptimizedQueryRes.where("partition = 
'2022-01-02'").count, 60)
+    // TODO(HUDI-3204) we have to revert this to pre-existing behavior from 
0.10
+    //assertEquals(readOptimizedQueryRes.where("partition = 
'2022-01-01'").count, 50)
+    //assertEquals(readOptimizedQueryRes.where("partition = 
'2022-01-02'").count, 60)
+    assertEquals(readOptimizedQueryRes.where("partition = 
'2022/01/01'").count, 50)
+    assertEquals(readOptimizedQueryRes.where("partition = 
'2022/01/02'").count, 60)
 
     // incremental query
     val incrementalQueryRes = spark.read.format("hudi")

Reply via email to