This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c91d7e1f78d [HUDI-5990] Avoid missing data during incremental queries
(#8299)
c91d7e1f78d is described below
commit c91d7e1f78dbb4a12dab23b5d4b147bfb150002a
Author: rfyu <[email protected]>
AuthorDate: Fri Apr 14 01:15:13 2023 +0800
[HUDI-5990] Avoid missing data during incremental queries (#8299)
The reason for missing data is that the timeline used by
`MergeOnReadIncrementalRelation` only contain completed
instants. When the incremental range contains an incomplete
compaction plan, fsView.getLatestMergedFileSlicesBeforeOrOn
in collectFileSplits will filter out some fileslices.
---
.../hudi/MergeOnReadIncrementalRelation.scala | 4 +-
.../functional/TestParquetColumnProjection.scala | 75 ++++++++++++++++++++--
2 files changed, 73 insertions(+), 6 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 93bf730a56d..636624f3950 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -60,9 +60,9 @@ case class MergeOnReadIncrementalRelation(override val
sqlContext: SQLContext,
override protected def timeline: HoodieTimeline = {
if (fullTableScan) {
- super.timeline
+ metaClient.getCommitsAndCompactionTimeline
} else {
- super.timeline.findInstantsInRange(startTimestamp, endTimestamp)
+
metaClient.getCommitsAndCompactionTimeline.findInstantsInRange(startTimestamp,
endTimestamp)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index 0eefc7beeec..eaf1839d5dc 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -22,12 +22,13 @@ import org.apache.calcite.runtime.SqlFunctions.abs
import org.apache.hudi.HoodieBaseRelation.projectSchema
import org.apache.hudi.common.config.{HoodieMetadataConfig,
HoodieStorageConfig}
import org.apache.hudi.common.model.{HoodieRecord,
OverwriteNonDefaultsWithLatestAvroPayload}
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.{HadoopMapRedUtils,
HoodieTestDataGenerator}
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
DefaultSource, HoodieBaseRelation, HoodieMergeOnReadRDD, HoodieSparkUtils,
HoodieUnsafeRDD}
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
@@ -252,7 +253,6 @@ class TestParquetColumnProjection extends
SparkClientFunctionalTestHarness with
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL,
DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, fullColumnsReadStats)
}
- // TODO add test for incremental query of the table with logs
@Test
def testMergeOnReadIncrementalRelationWithNoDeltaLogs(): Unit = {
val tablePath = s"$basePath/mor-no-logs"
@@ -296,6 +296,41 @@ class TestParquetColumnProjection extends
SparkClientFunctionalTestHarness with
projectedColumnsReadStats, incrementalOpts)
}
+ @Test
+ def testMergeOnReadIncrementalRelationWithDeltaLogs(): Unit = {
+ val tablePath = s"$basePath/mor-with-logs-incr"
+ val targetRecordsCount = 100
+
+ bootstrapMORTableWithDeltaLog(tablePath, targetRecordsCount,
defaultWriteOpts, populateMetaFields = true)
+
+ println(s"Running test for $tablePath / incremental")
+ /**
+ * State of timeline and updated data
+ *
+--------------+--------------+--------------+--------------+--------------------+--------------+--------------+--------------+
+ * | timeline | deltacommit1 | deltacommit2 | deltacommit3 |
compaction.request | deltacommit4 | deltacommit5 | deltacommit6 |
+ *
+--------------+--------------+--------------+--------------+--------------------+--------------+--------------+--------------+
+ * | updated data | 001 | 002 | 003 |
| 004 | 005 | 006 |
+ *
+--------------+--------------+--------------+--------------+--------------------+--------------+--------------+--------------+
+ */
+ val hoodieMetaClient =
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build()
+ val completedCommits =
hoodieMetaClient.getCommitsAndCompactionTimeline.filterCompletedInstants()
+ val startUnarchivedCommitTs =
completedCommits.nthInstant(1).get().getTimestamp //deltacommit2
+ val endUnarchivedCommitTs =
completedCommits.nthInstant(5).get().getTimestamp //deltacommit6
+
+ val readOpts = defaultWriteOpts ++ Map(
+ "path" -> tablePath,
+ DataSourceReadOptions.QUERY_TYPE.key ->
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
+ DataSourceReadOptions.BEGIN_INSTANTTIME.key -> startUnarchivedCommitTs,
+ DataSourceReadOptions.END_INSTANTTIME.key -> endUnarchivedCommitTs
+ )
+
+ val inputDf = spark.read.format("hudi")
+ .options(readOpts)
+ .load()
+ val commitNum = inputDf.select("rider").distinct().collect().length
+ assertTrue(commitNum > 1)
+ }
+
// Test routine
private def runTest(tableState: TableState,
queryType: String,
@@ -403,6 +438,38 @@ class TestParquetColumnProjection extends
SparkClientFunctionalTestHarness with
}
}
+ private def bootstrapMORTableWithDeltaLog(path: String,
+ recordCount: Int,
+ opts: Map[String, String],
+ populateMetaFields: Boolean,
+ dataGenOpt: Option[HoodieTestDataGenerator] =
None): (List[HoodieRecord[_]], Schema) = {
+ val dataGen = dataGenOpt.getOrElse(new HoodieTestDataGenerator(0x12345))
+
+ // Step 1: Bootstrap table w/ N records (t/h bulk-insert)
+ val (insertedRecords, schema) = bootstrapTable(path,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, recordCount, opts,
populateMetaFields, Some(dataGen))
+
+ for (i <- 2 to 6) {
+ val updatesCount = (insertedRecords.length * 0.5).toInt
+ val recordsToUpdate =
scala.util.Random.shuffle(insertedRecords).take(updatesCount)
+ val updatedRecords = dataGen.generateUpdates("%03d".format(i),
recordsToUpdate.asJava)
+
+ // Step 2: Update M records out of those (t/h update)
+ val inputDF = toDataset(updatedRecords,
HoodieTestDataGenerator.AVRO_SCHEMA)
+
+ inputDF.write.format("org.apache.hudi")
+ .options(opts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .option(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key, "true")
+ .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key, "false")
+ .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key,
"3")
+ .option(HoodieTableConfig.POPULATE_META_FIELDS.key,
populateMetaFields.toString)
+ .mode(SaveMode.Append)
+ .save(path)
+ }
+
+ (insertedRecords, schema)
+ }
+
def measureBytesRead[T](f: () => T): (T, Int) = {
// Init BenchmarkCounter to report number of bytes actually read from the
Block
BenchmarkCounter.initCounterFromReporter(HadoopMapRedUtils.createTestReporter,
fs.getConf)