This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 89f482e  [HUDI-1489] Fix null pointer exception when reading updated 
written bootstrap table (#2370)
89f482e is described below

commit 89f482eaf249bf89b7182c4ca97ffda0929b4b7e
Author: wenningd <[email protected]>
AuthorDate: Wed Dec 23 11:26:24 2020 -0800

    [HUDI-1489] Fix null pointer exception when reading updated written 
bootstrap table (#2370)
    
    Co-authored-by: Wenning Ding <[email protected]>
---
 .../org/apache/hudi/HoodieBootstrapRelation.scala  |   6 +-
 .../functional/TestDataSourceForBootstrap.scala    | 351 +++++++--------------
 2 files changed, 118 insertions(+), 239 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
index a1e9947..f7415f9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
@@ -91,6 +91,9 @@ class HoodieBootstrapRelation(@transient val _sqlContext: 
SQLContext,
     // Get required schemas for column pruning
     var requiredDataSchema = StructType(Seq())
     var requiredSkeletonSchema = StructType(Seq())
+    // requiredColsSchema is the schema of requiredColumns, note that 
requiredColumns is in a random order
+    // so requiredColsSchema is not always equal to 
(requiredSkeletonSchema.fields ++ requiredDataSchema.fields)
+    var requiredColsSchema = StructType(Seq())
     requiredColumns.foreach(col => {
       var field = dataSchema.find(_.name == col)
       if (field.isDefined) {
@@ -99,6 +102,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: 
SQLContext,
         field = skeletonSchema.find(_.name == col)
         requiredSkeletonSchema = requiredSkeletonSchema.add(field.get)
       }
+      requiredColsSchema = requiredColsSchema.add(field.get)
     })
 
     // Prepare readers for reading data file and skeleton files
@@ -129,7 +133,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: 
SQLContext,
         sparkSession = _sqlContext.sparkSession,
         dataSchema = fullSchema,
         partitionSchema = StructType(Seq.empty),
-        requiredSchema = StructType(requiredSkeletonSchema.fields ++ 
requiredDataSchema.fields),
+        requiredSchema = requiredColsSchema,
         filters = filters,
         options = Map.empty,
         hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf())
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index f24e5ad..2a6a0a7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -17,9 +17,6 @@
 
 package org.apache.hudi.functional
 
-import java.time.Instant
-import java.util.Collections
-
 import collection.JavaConverters._
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
@@ -37,10 +34,13 @@ import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.api.io.TempDir
 
+import java.time.Instant
+import java.util.Collections
+
 class TestDataSourceForBootstrap {
 
   var spark: SparkSession = _
-  val commonOpts = Map(
+  val commonOpts: Map[String, String] = Map(
     HoodieWriteConfig.INSERT_PARALLELISM -> "4",
     HoodieWriteConfig.UPSERT_PARALLELISM -> "4",
     HoodieWriteConfig.DELETE_PARALLELISM -> "4",
@@ -56,6 +56,14 @@ class TestDataSourceForBootstrap {
   var srcPath: String = _
   var fs: FileSystem = _
 
+  val partitionPaths: List[String] = List("2020-04-01", "2020-04-02", 
"2020-04-03")
+  val numRecords: Int = 100
+  val numRecordsUpdate: Int = 10
+  val verificationRowKey: String = "trip_0"
+  val verificationCol: String = "driver"
+  val originalVerificationVal: String = "driver_0"
+  val updatedVerificationVal: String = "driver_update"
+
   @BeforeEach def initialize(@TempDir tempDir: java.nio.file.Path) {
     spark = SparkSession.builder
       .appName("Hoodie Datasource test")
@@ -83,7 +91,6 @@ class TestDataSourceForBootstrap {
 
   @Test def testMetadataBootstrapCOWNonPartitioned(): Unit = {
     val timestamp = Instant.now.toEpochMilli
-    val numRecords = 100
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
     val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, 
numRecords, Collections.emptyList(), jsc,
@@ -96,20 +103,7 @@ class TestDataSourceForBootstrap {
       .save(srcPath)
 
     // Perform bootstrap
-    val bootstrapDF = spark.emptyDataFrame
-    bootstrapDF.write
-      .format("hudi")
-      .options(commonOpts)
-      .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
-      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
-      .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
-      .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator")
-      .mode(SaveMode.Overwrite)
-      .save(basePath)
-
-    val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, 
basePath)
-    assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, 
commitInstantTime1)
+    val commitInstantTime1 = 
runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
 
     // Read bootstrapped table and verify count
     var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
@@ -117,18 +111,14 @@ class TestDataSourceForBootstrap {
 
     // Perform upsert
     val updateTimestamp = Instant.now.toEpochMilli
-    val numRecordsUpdate = 10
     val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 
0, numRecordsUpdate,
       Collections.emptyList(), jsc, spark.sqlContext)
 
     updateDF.write
       .format("hudi")
       .options(commonOpts)
-      .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
-      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
       .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator")
       .mode(SaveMode.Append)
       .save(basePath)
@@ -141,36 +131,11 @@ class TestDataSourceForBootstrap {
     assertEquals(numRecords, hoodieROViewDF1.count())
     assertEquals(numRecordsUpdate, hoodieROViewDF1.filter(s"timestamp == 
$updateTimestamp").count())
 
-    // incrementally pull only changes in the bootstrap commit, which would 
pull all the initial records written
-    // during bootstrap
-    val hoodieIncViewDF1 = spark.read.format("hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
-      .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, 
commitInstantTime1)
-      .load(basePath)
-
-    assertEquals(numRecords, hoodieIncViewDF1.count())
-    var countsPerCommit = 
hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
-    assertEquals(1, countsPerCommit.length)
-    assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
-
-    // incrementally pull only changes in the latest commit, which would pull 
only the updated records in the
-    // latest commit
-    val hoodieIncViewDF2 = spark.read.format("hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, 
commitInstantTime1)
-      .load(basePath);
-
-    assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
-    countsPerCommit = 
hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
-    assertEquals(1, countsPerCommit.length)
-    assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
+    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, 
isPartitioned = false, isHiveStylePartitioned = false)
   }
 
   @Test def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = {
     val timestamp = Instant.now.toEpochMilli
-    val numRecords = 100
-    val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
     val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, 
numRecords, partitionPaths.asJava, jsc,
@@ -184,20 +149,7 @@ class TestDataSourceForBootstrap {
       .save(srcPath)
 
     // Perform bootstrap
-    val bootstrapDF = spark.emptyDataFrame
-    bootstrapDF.write
-      .format("hudi")
-      .options(commonOpts)
-      .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
-      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
-      .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
-      .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, 
"org.apache.hudi.keygen.SimpleKeyGenerator")
-      .mode(SaveMode.Overwrite)
-      .save(basePath)
-
-    val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, 
basePath)
-    assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, 
commitInstantTime1)
+    val commitInstantTime1 = 
runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
 
     // Read bootstrapped table and verify count
     val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
@@ -205,18 +157,14 @@ class TestDataSourceForBootstrap {
 
     // Perform upsert
     val updateTimestamp = Instant.now.toEpochMilli
-    val numRecordsUpdate = 10
     val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 
0, numRecordsUpdate, partitionPaths.asJava,
       jsc, spark.sqlContext)
 
     updateDF.write
       .format("hudi")
       .options(commonOpts)
-      .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
-      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
       // Required because source data is hive style partitioned
       .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
@@ -231,49 +179,14 @@ class TestDataSourceForBootstrap {
     assertEquals(numRecords, hoodieROViewDF2.count())
     assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == 
$updateTimestamp").count())
 
-    // incrementally pull only changes in the bootstrap commit, which would 
pull all the initial records written
-    // during bootstrap
-    val hoodieIncViewDF1 = spark.read.format("hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
-      .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, 
commitInstantTime1)
-      .load(basePath)
-
-    assertEquals(numRecords, hoodieIncViewDF1.count())
-    var countsPerCommit = 
hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
-    assertEquals(1, countsPerCommit.length)
-    assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
-
-    // incrementally pull only changes in the latest commit, which would pull 
only the updated records in the
-    // latest commit
-    val hoodieIncViewDF2 = spark.read.format("hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, 
commitInstantTime1)
-      .load(basePath);
-
-    assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
-    countsPerCommit = 
hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
-    assertEquals(1, countsPerCommit.length)
-    assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
-
-    // pull the latest commit within certain partitions
-    val hoodieIncViewDF3 = spark.read.format("hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, 
commitInstantTime1)
-      .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, 
"/datestr=2020-04-02/*")
-      .load(basePath)
-
-    
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
-      hoodieIncViewDF3.count())
+    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, 
isPartitioned = true, isHiveStylePartitioned = true)
   }
 
   @Test def testMetadataBootstrapCOWPartitioned(): Unit = {
     val timestamp = Instant.now.toEpochMilli
-    val numRecords = 100
-    val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
-    var sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, 
numRecords, partitionPaths.asJava, jsc,
+    val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, 
numRecords, partitionPaths.asJava, jsc,
       spark.sqlContext)
 
     // Writing data for each partition instead of using partitionBy to avoid 
hive-style partitioning and hence
@@ -288,92 +201,56 @@ class TestDataSourceForBootstrap {
     })
 
     // Perform bootstrap
-    val bootstrapDF = spark.emptyDataFrame
-    bootstrapDF.write
-      .format("hudi")
-      .options(commonOpts)
-      .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
-      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
-      .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
-      .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, 
"org.apache.hudi.keygen.SimpleKeyGenerator")
-      .mode(SaveMode.Overwrite)
-      .save(basePath)
-
-    val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, 
basePath)
-    assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, 
commitInstantTime1)
+    val commitInstantTime1 = 
runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
 
     // Read bootstrapped table and verify count
     val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
     assertEquals(numRecords, hoodieROViewDF1.count())
 
-    // Perform upsert
-    val updateTimestamp = Instant.now.toEpochMilli
-    val numRecordsUpdate = 10
-    var updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 
0, numRecordsUpdate, partitionPaths.asJava,
-      jsc, spark.sqlContext)
-
-    updateDF.write
+    // Perform upsert based on the written bootstrap table
+    val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === 
verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
+    updateDf1.write
       .format("hudi")
       .options(commonOpts)
-      .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
-      .option("hoodie.upsert.shuffle.parallelism", "4")
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
-      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
       .mode(SaveMode.Append)
       .save(basePath)
 
-    val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, 
basePath)
+    // Read table after upsert and verify the updated value
     assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
commitInstantTime1).size())
-
-    // Read table after upsert and verify count
     val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
-    assertEquals(numRecords, hoodieROViewDF2.count())
-    assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == 
$updateTimestamp").count())
-
-    // incrementally pull only changes in the bootstrap commit, which would 
pull all the initial records written
-    // during bootstrap
-    val hoodieIncViewDF1 = spark.read.format("hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
-      .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, 
commitInstantTime1)
-      .load(basePath)
+    hoodieROViewDF2.collect()
+    assertEquals(updatedVerificationVal, 
hoodieROViewDF2.filter(col("_row_key") === 
verificationRowKey).select(verificationCol).first.getString(0))
 
-    assertEquals(numRecords, hoodieIncViewDF1.count())
-    var countsPerCommit = 
hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
-    assertEquals(1, countsPerCommit.length)
-    assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
+    // Perform upsert based on the source data
+    val updateTimestamp = Instant.now.toEpochMilli
+    val updateDF2 = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 
0, numRecordsUpdate, partitionPaths.asJava,
+      jsc, spark.sqlContext)
 
-    // incrementally pull only changes in the latest commit, which would pull 
only the updated records in the
-    // latest commit
-    val hoodieIncViewDF2 = spark.read.format("hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, 
commitInstantTime1)
-      .load(basePath);
+    updateDF2.write
+      .format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
+      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
+      .mode(SaveMode.Append)
+      .save(basePath)
 
-    assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
-    countsPerCommit = 
hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
-    assertEquals(1, countsPerCommit.length)
-    assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
+    val commitInstantTime3: String = HoodieDataSourceHelpers.latestCommit(fs, 
basePath)
+    assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
commitInstantTime1).size())
 
-    // pull the latest commit within certain partitions
-    val hoodieIncViewDF3 = spark.read.format("hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, 
commitInstantTime1)
-      .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2020-04-02/*")
-      .load(basePath)
+    // Read table after upsert and verify count
+    val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*")
+    assertEquals(numRecords, hoodieROViewDF3.count())
+    assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == 
$updateTimestamp").count())
 
-    
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
-      hoodieIncViewDF3.count())
+    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, 
isPartitioned = true, isHiveStylePartitioned = false)
   }
 
   @Test def testMetadataBootstrapMORPartitionedInlineCompactionOn(): Unit = {
     val timestamp = Instant.now.toEpochMilli
-    val numRecords = 100
-    val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
     val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, 
numRecords, partitionPaths.asJava, jsc,
@@ -391,21 +268,7 @@ class TestDataSourceForBootstrap {
     })
 
     // Perform bootstrap
-    val bootstrapDF = spark.emptyDataFrame
-    bootstrapDF.write
-      .format("hudi")
-      .options(commonOpts)
-      .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
-      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
-      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
-      .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
-      .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, 
"org.apache.hudi.keygen.SimpleKeyGenerator")
-      .mode(SaveMode.Overwrite)
-      .save(basePath)
-
-    val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, 
basePath)
-    assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, 
commitInstantTime1)
+    val commitInstantTime1 = 
runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
 
     // Read bootstrapped table and verify count
     val hoodieROViewDF1 = spark.read.format("hudi")
@@ -416,18 +279,14 @@ class TestDataSourceForBootstrap {
 
     // Perform upsert
     val updateTimestamp = Instant.now.toEpochMilli
-    val numRecordsUpdate = 10
     val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 
0, numRecordsUpdate, partitionPaths.asJava,
       jsc, spark.sqlContext)
 
     updateDF.write
       .format("hudi")
       .options(commonOpts)
-      .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
-      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
       .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true")
       .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, 
"1")
@@ -449,8 +308,6 @@ class TestDataSourceForBootstrap {
 
   @Test def testMetadataBootstrapMORPartitioned(): Unit = {
     val timestamp = Instant.now.toEpochMilli
-    val numRecords = 100
-    val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
     val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, 
numRecords, partitionPaths.asJava, jsc,
@@ -468,64 +325,64 @@ class TestDataSourceForBootstrap {
     })
 
     // Perform bootstrap
-    val bootstrapDF = spark.emptyDataFrame
-    bootstrapDF.write
+    val commitInstantTime1 = 
runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+
+    // Read bootstrapped table and verify count
+    val hoodieROViewDF1 = spark.read.format("hudi")
+                            .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
+                              
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+                            .load(basePath + "/*")
+    assertEquals(numRecords, hoodieROViewDF1.count())
+
+    // Perform upsert based on the written bootstrap table
+    val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === 
verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
+    updateDf1.write
       .format("hudi")
       .options(commonOpts)
-      .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
-      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
-      .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
-      .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, 
"org.apache.hudi.keygen.SimpleKeyGenerator")
-      .mode(SaveMode.Overwrite)
+      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
+      .mode(SaveMode.Append)
       .save(basePath)
 
-    val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, 
basePath)
-    assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, 
commitInstantTime1)
-
-    // Read bootstrapped table and verify count
-    val hoodieROViewDF1 = spark.read.format("hudi")
+    // Read table after upsert and verify the value
+    assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
commitInstantTime1).size())
+    val hoodieROViewDF2 = spark.read.format("hudi")
                             .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
                               
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
                             .load(basePath + "/*")
-    assertEquals(numRecords, hoodieROViewDF1.count())
+    hoodieROViewDF2.collect()
+    assertEquals(originalVerificationVal, 
hoodieROViewDF2.filter(col("_row_key") === 
verificationRowKey).select(verificationCol).first.getString(0))
 
-    // Perform upsert
+    // Perform upsert based on the source data
     val updateTimestamp = Instant.now.toEpochMilli
-    val numRecordsUpdate = 10
-    val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 
0, numRecordsUpdate,
+    val updateDF2 = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 
0, numRecordsUpdate,
       partitionPaths.asJava, jsc, spark.sqlContext)
 
-    updateDF.write
+    updateDF2.write
       .format("hudi")
       .options(commonOpts)
-      .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
-      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
       .mode(SaveMode.Append)
       .save(basePath)
 
-    // Expect 1 new commit since meta bootstrap - delta commit (because inline 
compaction is off)
-    assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
commitInstantTime1).size())
+    // Expect 2 new commit since meta bootstrap - 2 delta commits (because 
inline compaction is off)
+    assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
commitInstantTime1).size())
 
     // Read table after upsert and verify count. Since we have inline 
compaction off the RO view will have
     // no updated rows.
-    val hoodieROViewDF2 = spark.read.format("hudi")
+    val hoodieROViewDF3 = spark.read.format("hudi")
                             .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
                               
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
                             .load(basePath + "/*")
-    assertEquals(numRecords, hoodieROViewDF2.count())
-    assertEquals(0, hoodieROViewDF2.filter(s"timestamp == 
$updateTimestamp").count())
+    assertEquals(numRecords, hoodieROViewDF3.count())
+    assertEquals(0, hoodieROViewDF3.filter(s"timestamp == 
$updateTimestamp").count())
   }
 
   @Test def testFullBootstrapCOWPartitioned(): Unit = {
     val timestamp = Instant.now.toEpochMilli
-    val numRecords = 100
-    val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
     val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, 
numRecords, partitionPaths.asJava, jsc,
@@ -547,11 +404,8 @@ class TestDataSourceForBootstrap {
     bootstrapDF.write
       .format("hudi")
       .options(commonOpts)
-      .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
-      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
       .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
       .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, 
classOf[SimpleKeyGenerator].getName)
       .option(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR, 
classOf[FullRecordBootstrapModeSelector].getName)
@@ -568,18 +422,14 @@ class TestDataSourceForBootstrap {
 
     // Perform upsert
     val updateTimestamp = Instant.now.toEpochMilli
-    val numRecordsUpdate = 10
     val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 
0, numRecordsUpdate, partitionPaths.asJava,
       jsc, spark.sqlContext)
 
     updateDF.write
       .format("hudi")
       .options(commonOpts)
-      .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
-      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
       .mode(SaveMode.Append)
       .save(basePath)
@@ -592,39 +442,64 @@ class TestDataSourceForBootstrap {
     assertEquals(numRecords, hoodieROViewDF2.count())
     assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == 
$updateTimestamp").count())
 
+    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, 
isPartitioned = true, isHiveStylePartitioned = false)
+  }
+
+  def runMetadataBootstrapAndVerifyCommit(tableType: String): String = {
+    val bootstrapDF = spark.emptyDataFrame
+    bootstrapDF.write
+      .format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType)
+      .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
+      .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, 
classOf[SimpleKeyGenerator].getName)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, 
basePath)
+    assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, 
commitInstantTime1)
+    commitInstantTime1
+  }
+
+  def verifyIncrementalViewResult(bootstrapCommitInstantTime: String, 
latestCommitInstantTime: String,
+                                  isPartitioned: Boolean, 
isHiveStylePartitioned: Boolean): Unit = {
     // incrementally pull only changes in the bootstrap commit, which would 
pull all the initial records written
     // during bootstrap
     val hoodieIncViewDF1 = spark.read.format("hudi")
       .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
       .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
-      .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, 
commitInstantTime1)
+      .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, 
bootstrapCommitInstantTime)
       .load(basePath)
 
     assertEquals(numRecords, hoodieIncViewDF1.count())
     var countsPerCommit = 
hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
     assertEquals(1, countsPerCommit.length)
-    assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
+    assertEquals(bootstrapCommitInstantTime, countsPerCommit(0).get(0))
 
-    // incrementally pull only changes in the latest commit, which would pull 
only the updated records in the
-    // latest commit
+    // incrementally pull only changes after bootstrap commit, which would 
pull only the updated records in the
+    // later commits
     val hoodieIncViewDF2 = spark.read.format("hudi")
       .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, 
commitInstantTime1)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, 
bootstrapCommitInstantTime)
       .load(basePath);
 
     assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
     countsPerCommit = 
hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
     assertEquals(1, countsPerCommit.length)
-    assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
-
-    // pull the latest commit within certain partitions
-    val hoodieIncViewDF3 = spark.read.format("hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, 
commitInstantTime1)
-      .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2020-04-02/*")
-      .load(basePath)
-
-    
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
-      hoodieIncViewDF3.count())
+    assertEquals(latestCommitInstantTime, countsPerCommit(0).get(0))
+
+    if (isPartitioned) {
+      val relativePartitionPath = if (isHiveStylePartitioned) 
"/datestr=2020-04-02/*" else "/2020-04-02/*"
+      // pull the update commits within certain partitions
+      val hoodieIncViewDF3 = spark.read.format("hudi")
+        .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+        .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, 
bootstrapCommitInstantTime)
+        .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, 
relativePartitionPath)
+        .load(basePath)
+
+      
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
+        hoodieIncViewDF3.count())
+    }
   }
 }

Reply via email to