This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 89f482e [HUDI-1489] Fix null pointer exception when reading updated
written bootstrap table (#2370)
89f482e is described below
commit 89f482eaf249bf89b7182c4ca97ffda0929b4b7e
Author: wenningd <[email protected]>
AuthorDate: Wed Dec 23 11:26:24 2020 -0800
[HUDI-1489] Fix null pointer exception when reading updated written
bootstrap table (#2370)
Co-authored-by: Wenning Ding <[email protected]>
---
.../org/apache/hudi/HoodieBootstrapRelation.scala | 6 +-
.../functional/TestDataSourceForBootstrap.scala | 351 +++++++--------------
2 files changed, 118 insertions(+), 239 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
index a1e9947..f7415f9 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
@@ -91,6 +91,9 @@ class HoodieBootstrapRelation(@transient val _sqlContext:
SQLContext,
// Get required schemas for column pruning
var requiredDataSchema = StructType(Seq())
var requiredSkeletonSchema = StructType(Seq())
+ // requiredColsSchema is the schema of requiredColumns, note that
requiredColumns is in a random order
+ // so requiredColsSchema is not always equal to
(requiredSkeletonSchema.fields ++ requiredDataSchema.fields)
+ var requiredColsSchema = StructType(Seq())
requiredColumns.foreach(col => {
var field = dataSchema.find(_.name == col)
if (field.isDefined) {
@@ -99,6 +102,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext:
SQLContext,
field = skeletonSchema.find(_.name == col)
requiredSkeletonSchema = requiredSkeletonSchema.add(field.get)
}
+ requiredColsSchema = requiredColsSchema.add(field.get)
})
// Prepare readers for reading data file and skeleton files
@@ -129,7 +133,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext:
SQLContext,
sparkSession = _sqlContext.sparkSession,
dataSchema = fullSchema,
partitionSchema = StructType(Seq.empty),
- requiredSchema = StructType(requiredSkeletonSchema.fields ++
requiredDataSchema.fields),
+ requiredSchema = requiredColsSchema,
filters = filters,
options = Map.empty,
hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf())
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index f24e5ad..2a6a0a7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -17,9 +17,6 @@
package org.apache.hudi.functional
-import java.time.Instant
-import java.util.Collections
-
import collection.JavaConverters._
import org.apache.hadoop.fs.FileSystem
import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
@@ -37,10 +34,13 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.io.TempDir
+import java.time.Instant
+import java.util.Collections
+
class TestDataSourceForBootstrap {
var spark: SparkSession = _
- val commonOpts = Map(
+ val commonOpts: Map[String, String] = Map(
HoodieWriteConfig.INSERT_PARALLELISM -> "4",
HoodieWriteConfig.UPSERT_PARALLELISM -> "4",
HoodieWriteConfig.DELETE_PARALLELISM -> "4",
@@ -56,6 +56,14 @@ class TestDataSourceForBootstrap {
var srcPath: String = _
var fs: FileSystem = _
+ val partitionPaths: List[String] = List("2020-04-01", "2020-04-02",
"2020-04-03")
+ val numRecords: Int = 100
+ val numRecordsUpdate: Int = 10
+ val verificationRowKey: String = "trip_0"
+ val verificationCol: String = "driver"
+ val originalVerificationVal: String = "driver_0"
+ val updatedVerificationVal: String = "driver_update"
+
@BeforeEach def initialize(@TempDir tempDir: java.nio.file.Path) {
spark = SparkSession.builder
.appName("Hoodie Datasource test")
@@ -83,7 +91,6 @@ class TestDataSourceForBootstrap {
@Test def testMetadataBootstrapCOWNonPartitioned(): Unit = {
val timestamp = Instant.now.toEpochMilli
- val numRecords = 100
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0,
numRecords, Collections.emptyList(), jsc,
@@ -96,20 +103,7 @@ class TestDataSourceForBootstrap {
.save(srcPath)
// Perform bootstrap
- val bootstrapDF = spark.emptyDataFrame
- bootstrapDF.write
- .format("hudi")
- .options(commonOpts)
- .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
- .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
- .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
- .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
- .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS,
"org.apache.hudi.keygen.NonpartitionedKeyGenerator")
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
- val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs,
basePath)
- assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
commitInstantTime1)
+ val commitInstantTime1 =
runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
// Read bootstrapped table and verify count
var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
@@ -117,18 +111,14 @@ class TestDataSourceForBootstrap {
// Perform upsert
val updateTimestamp = Instant.now.toEpochMilli
- val numRecordsUpdate = 10
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp,
0, numRecordsUpdate,
Collections.emptyList(), jsc, spark.sqlContext)
updateDF.write
.format("hudi")
.options(commonOpts)
- .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
- .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
- .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY,
"org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.mode(SaveMode.Append)
.save(basePath)
@@ -141,36 +131,11 @@ class TestDataSourceForBootstrap {
assertEquals(numRecords, hoodieROViewDF1.count())
assertEquals(numRecordsUpdate, hoodieROViewDF1.filter(s"timestamp ==
$updateTimestamp").count())
- // incrementally pull only changes in the bootstrap commit, which would
pull all the initial records written
- // during bootstrap
- val hoodieIncViewDF1 = spark.read.format("hudi")
- .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
- .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY,
commitInstantTime1)
- .load(basePath)
-
- assertEquals(numRecords, hoodieIncViewDF1.count())
- var countsPerCommit =
hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
- assertEquals(1, countsPerCommit.length)
- assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
-
- // incrementally pull only changes in the latest commit, which would pull
only the updated records in the
- // latest commit
- val hoodieIncViewDF2 = spark.read.format("hudi")
- .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
commitInstantTime1)
- .load(basePath);
-
- assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
- countsPerCommit =
hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
- assertEquals(1, countsPerCommit.length)
- assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
+ verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2,
isPartitioned = false, isHiveStylePartitioned = false)
}
@Test def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = {
val timestamp = Instant.now.toEpochMilli
- val numRecords = 100
- val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0,
numRecords, partitionPaths.asJava, jsc,
@@ -184,20 +149,7 @@ class TestDataSourceForBootstrap {
.save(srcPath)
// Perform bootstrap
- val bootstrapDF = spark.emptyDataFrame
- bootstrapDF.write
- .format("hudi")
- .options(commonOpts)
- .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
- .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
- .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
- .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
- .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS,
"org.apache.hudi.keygen.SimpleKeyGenerator")
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
- val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs,
basePath)
- assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
commitInstantTime1)
+ val commitInstantTime1 =
runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
@@ -205,18 +157,14 @@ class TestDataSourceForBootstrap {
// Perform upsert
val updateTimestamp = Instant.now.toEpochMilli
- val numRecordsUpdate = 10
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp,
0, numRecordsUpdate, partitionPaths.asJava,
jsc, spark.sqlContext)
updateDF.write
.format("hudi")
.options(commonOpts)
- .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
- .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
- .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
// Required because source data is hive style partitioned
.option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
@@ -231,49 +179,14 @@ class TestDataSourceForBootstrap {
assertEquals(numRecords, hoodieROViewDF2.count())
assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp ==
$updateTimestamp").count())
- // incrementally pull only changes in the bootstrap commit, which would
pull all the initial records written
- // during bootstrap
- val hoodieIncViewDF1 = spark.read.format("hudi")
- .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
- .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY,
commitInstantTime1)
- .load(basePath)
-
- assertEquals(numRecords, hoodieIncViewDF1.count())
- var countsPerCommit =
hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
- assertEquals(1, countsPerCommit.length)
- assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
-
- // incrementally pull only changes in the latest commit, which would pull
only the updated records in the
- // latest commit
- val hoodieIncViewDF2 = spark.read.format("hudi")
- .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
commitInstantTime1)
- .load(basePath);
-
- assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
- countsPerCommit =
hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
- assertEquals(1, countsPerCommit.length)
- assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
-
- // pull the latest commit within certain partitions
- val hoodieIncViewDF3 = spark.read.format("hudi")
- .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
commitInstantTime1)
- .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY,
"/datestr=2020-04-02/*")
- .load(basePath)
-
-
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
- hoodieIncViewDF3.count())
+ verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2,
isPartitioned = true, isHiveStylePartitioned = true)
}
@Test def testMetadataBootstrapCOWPartitioned(): Unit = {
val timestamp = Instant.now.toEpochMilli
- val numRecords = 100
- val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
- var sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0,
numRecords, partitionPaths.asJava, jsc,
+ val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0,
numRecords, partitionPaths.asJava, jsc,
spark.sqlContext)
// Writing data for each partition instead of using partitionBy to avoid
hive-style partitioning and hence
@@ -288,92 +201,56 @@ class TestDataSourceForBootstrap {
})
// Perform bootstrap
- val bootstrapDF = spark.emptyDataFrame
- bootstrapDF.write
- .format("hudi")
- .options(commonOpts)
- .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
- .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
- .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
- .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
- .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS,
"org.apache.hudi.keygen.SimpleKeyGenerator")
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
- val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs,
basePath)
- assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
commitInstantTime1)
+ val commitInstantTime1 =
runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF1.count())
- // Perform upsert
- val updateTimestamp = Instant.now.toEpochMilli
- val numRecordsUpdate = 10
- var updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp,
0, numRecordsUpdate, partitionPaths.asJava,
- jsc, spark.sqlContext)
-
- updateDF.write
+ // Perform upsert based on the written bootstrap table
+ val updateDf1 = hoodieROViewDF1.filter(col("_row_key") ===
verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
+ updateDf1.write
.format("hudi")
.options(commonOpts)
- .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
- .option("hoodie.upsert.shuffle.parallelism", "4")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
- .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
- .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
.mode(SaveMode.Append)
.save(basePath)
- val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs,
basePath)
+ // Read table after upsert and verify the updated value
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
commitInstantTime1).size())
-
- // Read table after upsert and verify count
val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
- assertEquals(numRecords, hoodieROViewDF2.count())
- assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp ==
$updateTimestamp").count())
-
- // incrementally pull only changes in the bootstrap commit, which would
pull all the initial records written
- // during bootstrap
- val hoodieIncViewDF1 = spark.read.format("hudi")
- .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
- .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY,
commitInstantTime1)
- .load(basePath)
+ hoodieROViewDF2.collect()
+ assertEquals(updatedVerificationVal,
hoodieROViewDF2.filter(col("_row_key") ===
verificationRowKey).select(verificationCol).first.getString(0))
- assertEquals(numRecords, hoodieIncViewDF1.count())
- var countsPerCommit =
hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
- assertEquals(1, countsPerCommit.length)
- assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
+ // Perform upsert based on the source data
+ val updateTimestamp = Instant.now.toEpochMilli
+ val updateDF2 = TestBootstrap.generateTestRawTripDataset(updateTimestamp,
0, numRecordsUpdate, partitionPaths.asJava,
+ jsc, spark.sqlContext)
- // incrementally pull only changes in the latest commit, which would pull
only the updated records in the
- // latest commit
- val hoodieIncViewDF2 = spark.read.format("hudi")
- .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
commitInstantTime1)
- .load(basePath);
+ updateDF2.write
+ .format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
+ .mode(SaveMode.Append)
+ .save(basePath)
- assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
- countsPerCommit =
hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
- assertEquals(1, countsPerCommit.length)
- assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
+ val commitInstantTime3: String = HoodieDataSourceHelpers.latestCommit(fs,
basePath)
+ assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
commitInstantTime1).size())
- // pull the latest commit within certain partitions
- val hoodieIncViewDF3 = spark.read.format("hudi")
- .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
commitInstantTime1)
- .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2020-04-02/*")
- .load(basePath)
+ // Read table after upsert and verify count
+ val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*")
+ assertEquals(numRecords, hoodieROViewDF3.count())
+ assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp ==
$updateTimestamp").count())
-
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
- hoodieIncViewDF3.count())
+ verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3,
isPartitioned = true, isHiveStylePartitioned = false)
}
@Test def testMetadataBootstrapMORPartitionedInlineCompactionOn(): Unit = {
val timestamp = Instant.now.toEpochMilli
- val numRecords = 100
- val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0,
numRecords, partitionPaths.asJava, jsc,
@@ -391,21 +268,7 @@ class TestDataSourceForBootstrap {
})
// Perform bootstrap
- val bootstrapDF = spark.emptyDataFrame
- bootstrapDF.write
- .format("hudi")
- .options(commonOpts)
- .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
- .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
- .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
- .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
- .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
- .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS,
"org.apache.hudi.keygen.SimpleKeyGenerator")
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
- val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs,
basePath)
- assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
commitInstantTime1)
+ val commitInstantTime1 =
runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi")
@@ -416,18 +279,14 @@ class TestDataSourceForBootstrap {
// Perform upsert
val updateTimestamp = Instant.now.toEpochMilli
- val numRecordsUpdate = 10
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp,
0, numRecordsUpdate, partitionPaths.asJava,
jsc, spark.sqlContext)
updateDF.write
.format("hudi")
.options(commonOpts)
- .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
- .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
- .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
.option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true")
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP,
"1")
@@ -449,8 +308,6 @@ class TestDataSourceForBootstrap {
@Test def testMetadataBootstrapMORPartitioned(): Unit = {
val timestamp = Instant.now.toEpochMilli
- val numRecords = 100
- val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0,
numRecords, partitionPaths.asJava, jsc,
@@ -468,64 +325,64 @@ class TestDataSourceForBootstrap {
})
// Perform bootstrap
- val bootstrapDF = spark.emptyDataFrame
- bootstrapDF.write
+ val commitInstantTime1 =
runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+
+ // Read bootstrapped table and verify count
+ val hoodieROViewDF1 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
+
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .load(basePath + "/*")
+ assertEquals(numRecords, hoodieROViewDF1.count())
+
+ // Perform upsert based on the written bootstrap table
+ val updateDf1 = hoodieROViewDF1.filter(col("_row_key") ===
verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
+ updateDf1.write
.format("hudi")
.options(commonOpts)
- .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
- .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
- .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
- .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
- .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS,
"org.apache.hudi.keygen.SimpleKeyGenerator")
- .mode(SaveMode.Overwrite)
+ .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
+ .mode(SaveMode.Append)
.save(basePath)
- val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs,
basePath)
- assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
commitInstantTime1)
-
- // Read bootstrapped table and verify count
- val hoodieROViewDF1 = spark.read.format("hudi")
+ // Read table after upsert and verify the value
+ assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
commitInstantTime1).size())
+ val hoodieROViewDF2 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.load(basePath + "/*")
- assertEquals(numRecords, hoodieROViewDF1.count())
+ hoodieROViewDF2.collect()
+ assertEquals(originalVerificationVal,
hoodieROViewDF2.filter(col("_row_key") ===
verificationRowKey).select(verificationCol).first.getString(0))
- // Perform upsert
+ // Perform upsert based on the source data
val updateTimestamp = Instant.now.toEpochMilli
- val numRecordsUpdate = 10
- val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp,
0, numRecordsUpdate,
+ val updateDF2 = TestBootstrap.generateTestRawTripDataset(updateTimestamp,
0, numRecordsUpdate,
partitionPaths.asJava, jsc, spark.sqlContext)
- updateDF.write
+ updateDF2.write
.format("hudi")
.options(commonOpts)
- .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
- .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
- .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
.mode(SaveMode.Append)
.save(basePath)
- // Expect 1 new commit since meta bootstrap - delta commit (because inline
compaction is off)
- assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
commitInstantTime1).size())
+ // Expect 2 new commit since meta bootstrap - 2 delta commits (because
inline compaction is off)
+ assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
commitInstantTime1).size())
// Read table after upsert and verify count. Since we have inline
compaction off the RO view will have
// no updated rows.
- val hoodieROViewDF2 = spark.read.format("hudi")
+ val hoodieROViewDF3 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.load(basePath + "/*")
- assertEquals(numRecords, hoodieROViewDF2.count())
- assertEquals(0, hoodieROViewDF2.filter(s"timestamp ==
$updateTimestamp").count())
+ assertEquals(numRecords, hoodieROViewDF3.count())
+ assertEquals(0, hoodieROViewDF3.filter(s"timestamp ==
$updateTimestamp").count())
}
@Test def testFullBootstrapCOWPartitioned(): Unit = {
val timestamp = Instant.now.toEpochMilli
- val numRecords = 100
- val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0,
numRecords, partitionPaths.asJava, jsc,
@@ -547,11 +404,8 @@ class TestDataSourceForBootstrap {
bootstrapDF.write
.format("hudi")
.options(commonOpts)
- .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
- .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
- .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS,
classOf[SimpleKeyGenerator].getName)
.option(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR,
classOf[FullRecordBootstrapModeSelector].getName)
@@ -568,18 +422,14 @@ class TestDataSourceForBootstrap {
// Perform upsert
val updateTimestamp = Instant.now.toEpochMilli
- val numRecordsUpdate = 10
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp,
0, numRecordsUpdate, partitionPaths.asJava,
jsc, spark.sqlContext)
updateDF.write
.format("hudi")
.options(commonOpts)
- .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
- .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
- .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
.mode(SaveMode.Append)
.save(basePath)
@@ -592,39 +442,64 @@ class TestDataSourceForBootstrap {
assertEquals(numRecords, hoodieROViewDF2.count())
assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp ==
$updateTimestamp").count())
+ verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2,
isPartitioned = true, isHiveStylePartitioned = false)
+ }
+
+ def runMetadataBootstrapAndVerifyCommit(tableType: String): String = {
+ val bootstrapDF = spark.emptyDataFrame
+ bootstrapDF.write
+ .format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType)
+ .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
+ .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS,
classOf[SimpleKeyGenerator].getName)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs,
basePath)
+ assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
commitInstantTime1)
+ commitInstantTime1
+ }
+
+ def verifyIncrementalViewResult(bootstrapCommitInstantTime: String,
latestCommitInstantTime: String,
+ isPartitioned: Boolean,
isHiveStylePartitioned: Boolean): Unit = {
// incrementally pull only changes in the bootstrap commit, which would
pull all the initial records written
// during bootstrap
val hoodieIncViewDF1 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
- .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY,
commitInstantTime1)
+ .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY,
bootstrapCommitInstantTime)
.load(basePath)
assertEquals(numRecords, hoodieIncViewDF1.count())
var countsPerCommit =
hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
assertEquals(1, countsPerCommit.length)
- assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
+ assertEquals(bootstrapCommitInstantTime, countsPerCommit(0).get(0))
- // incrementally pull only changes in the latest commit, which would pull
only the updated records in the
- // latest commit
+ // incrementally pull only changes after bootstrap commit, which would
pull only the updated records in the
+ // later commits
val hoodieIncViewDF2 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
commitInstantTime1)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
bootstrapCommitInstantTime)
.load(basePath);
assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
countsPerCommit =
hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
assertEquals(1, countsPerCommit.length)
- assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
-
- // pull the latest commit within certain partitions
- val hoodieIncViewDF3 = spark.read.format("hudi")
- .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
commitInstantTime1)
- .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2020-04-02/*")
- .load(basePath)
-
-
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
- hoodieIncViewDF3.count())
+ assertEquals(latestCommitInstantTime, countsPerCommit(0).get(0))
+
+ if (isPartitioned) {
+ val relativePartitionPath = if (isHiveStylePartitioned)
"/datestr=2020-04-02/*" else "/2020-04-02/*"
+ // pull the update commits within certain partitions
+ val hoodieIncViewDF3 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
bootstrapCommitInstantTime)
+ .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY,
relativePartitionPath)
+ .load(basePath)
+
+
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
+ hoodieIncViewDF3.count())
+ }
}
}