This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f1f0109ab8 [HUDI-4440] Treat boostrapped table as non-partitioned in 
HudiFileIndex if partition column is missing from schema (#6163)
f1f0109ab8 is described below

commit f1f0109ab89f7317c89c6d1d51545a24f12b4692
Author: Rahil C <[email protected]>
AuthorDate: Sat Jul 23 11:44:40 2022 -0700

    [HUDI-4440] Treat boostrapped table as non-partitioned in HudiFileIndex if 
partition column is missing from schema (#6163)
    
    Co-authored-by: Ryan Pifer <[email protected]>
---
 .../apache/hudi/SparkHoodieTableFileIndex.scala    | 23 +++++++++---
 .../functional/TestDataSourceForBootstrap.scala    | 42 ++++++++++++++--------
 2 files changed, 47 insertions(+), 18 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index cd1c1fb4af..4e70ebad75 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -22,6 +22,7 @@ import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
 import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, 
QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, 
QUERY_TYPE_SNAPSHOT_OPT_VAL}
 import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, 
generateFieldMap, toJavaOption}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.bootstrap.index.BootstrapIndex
 import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
@@ -96,10 +97,24 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
         val partitionFields = partitionColumns.get().map(column => 
StructField(column, StringType))
         StructType(partitionFields)
       } else {
-        val partitionFields = partitionColumns.get().map(column =>
-          nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column: '" +
-            s"$column' in the schema[${schema.fields.mkString(",")}]")))
-        StructType(partitionFields)
+        val partitionFields = partitionColumns.get().filter(column => 
nameFieldMap.contains(column))
+          .map(column => nameFieldMap.apply(column))
+
+        if (partitionFields.size != partitionColumns.get().size) {
+          val isBootstrapTable = 
BootstrapIndex.getBootstrapIndex(metaClient).useIndex()
+          if (isBootstrapTable) {
+            // For bootstrapped tables its possible the schema does not 
contain partition field when source table
+            // is hive style partitioned. In this case we would like to treat 
the table as non-partitioned
+            // as opposed to failing
+            new StructType()
+          } else {
+            throw new IllegalArgumentException(s"Cannot find columns: " +
+              s"'${partitionColumns.get().filter(col => 
!nameFieldMap.contains(col)).mkString(",")}' " +
+              s"in the schema[${schema.fields.mkString(",")}]")
+          }
+        } else {
+          new StructType(partitionFields)
+        }
       }
     } else {
       // If the partition columns have not stored in hoodie.properties(the 
table that was
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index eeed5fe75b..f0dd89df1c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -109,9 +109,12 @@ class TestDataSourceForBootstrap {
     // check marked directory clean up
     assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
 
-    // Read bootstrapped table and verify count
-    var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
+    // Read bootstrapped table and verify count using glob path
+    val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
     assertEquals(numRecords, hoodieROViewDF1.count())
+    // Read bootstrapped table and verify count using Hudi file index
+    val hoodieROViewDF2 = spark.read.format("hudi").load(basePath)
+    assertEquals(numRecords, hoodieROViewDF2.count())
 
     // Perform upsert
     val updateTimestamp = Instant.now.toEpochMilli
@@ -130,11 +133,11 @@ class TestDataSourceForBootstrap {
     val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, 
basePath)
     assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
commitInstantTime1).size())
 
-    // Read table after upsert and verify count
-    hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
-    assertEquals(numRecords, hoodieROViewDF1.count())
-    assertEquals(numRecordsUpdate, hoodieROViewDF1.filter(s"timestamp == 
$updateTimestamp").count())
-    // Read without *
+    // Read table after upsert and verify count using glob path
+    val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*")
+    assertEquals(numRecords, hoodieROViewDF3.count())
+    assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == 
$updateTimestamp").count())
+    // Read with base path using Hudi file index
     val hoodieROViewDF1WithBasePath = spark.read.format("hudi").load(basePath)
     assertEquals(numRecords, hoodieROViewDF1WithBasePath.count())
     assertEquals(numRecordsUpdate, 
hoodieROViewDF1WithBasePath.filter(s"timestamp == $updateTimestamp").count())
@@ -169,6 +172,9 @@ class TestDataSourceForBootstrap {
     // Read bootstrapped table and verify count
     val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
     assertEquals(numRecords, hoodieROViewDF1.count())
+    // Read bootstrapped table and verify count using Hudi file index
+    val hoodieROViewDF2 = spark.read.format("hudi").load(basePath)
+    assertEquals(numRecords, hoodieROViewDF2.count())
 
     // Perform upsert
     val updateTimestamp = Instant.now.toEpochMilli
@@ -189,10 +195,14 @@ class TestDataSourceForBootstrap {
     val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, 
basePath)
     assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
commitInstantTime1).size())
 
-    // Read table after upsert and verify count
-    val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
-    assertEquals(numRecords, hoodieROViewDF2.count())
-    assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == 
$updateTimestamp").count())
+    // Read table after upsert and verify count using glob path
+    val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*")
+    assertEquals(numRecords, hoodieROViewDF3.count())
+    assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == 
$updateTimestamp").count())
+    // Read table after upsert and verify count using Hudi file index
+    val hoodieROViewDF4 = spark.read.format("hudi").load(basePath)
+    assertEquals(numRecords, hoodieROViewDF4.count())
+    assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == 
$updateTimestamp").count())
 
     verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, 
isPartitioned = true, isHiveStylePartitioned = true)
   }
@@ -219,10 +229,10 @@ class TestDataSourceForBootstrap {
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
       DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr"))
 
-    // Read bootstrapped table and verify count
+    // Read bootstrapped table and verify count using glob path
     val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
     assertEquals(numRecords, hoodieROViewDF1.count())
-    // Read without *
+    // Read with base path using Hudi file index
     val hoodieROViewWithBasePathDF1 = spark.read.format("hudi").load(basePath)
     assertEquals(numRecords, hoodieROViewWithBasePathDF1.count())
 
@@ -260,10 +270,14 @@ class TestDataSourceForBootstrap {
     val commitInstantTime3: String = HoodieDataSourceHelpers.latestCommit(fs, 
basePath)
     assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
commitInstantTime1).size())
 
-    // Read table after upsert and verify count
+    // Read table after upsert and verify count using glob paths
     val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*")
     assertEquals(numRecords, hoodieROViewDF3.count())
     assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == 
$updateTimestamp").count())
+    // Read table after upsert and verify count using Hudi file index
+    val hoodieROViewDF4 = spark.read.format("hudi").load(basePath)
+    assertEquals(numRecords, hoodieROViewDF4.count())
+    assertEquals(numRecordsUpdate, hoodieROViewDF4.filter(s"timestamp == 
$updateTimestamp").count())
 
     verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, 
isPartitioned = true, isHiveStylePartitioned = false)
   }

Reply via email to