This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new eae85cb5d81 [HUDI-8137] Support time travel query on MDT in Spark
Datasource (#11861)
eae85cb5d81 is described below
commit eae85cb5d81b7aa2f8d613bc5e372ccd395ea871
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Sep 5 22:16:45 2024 -0400
[HUDI-8137] Support time travel query on MDT in Spark Datasource (#11861)
Co-authored-by: Jonathan Vexler <=>
---
.../hudi/common/table/timeline/HoodieTimeline.java | 19 +++
.../hudi/metadata/HoodieBackedTableMetadata.java | 9 +-
.../src/main/scala/org/apache/hudi/Iterators.scala | 2 +-
.../TestMetadataTableWithSparkDataSource.scala | 174 ++++++++++++++++++++-
4 files changed, 199 insertions(+), 5 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index a7344fc1512..21e337f8165 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -437,6 +437,25 @@ public interface HoodieTimeline extends Serializable {
return predicateToApply.test(commit1, commit2);
}
+ /**
+ * Returns smaller of the two given timestamps. Returns the non null
argument if one of the argument is null.
+ */
+ static String minTimestamp(String commit1, String commit2) {
+ if (StringUtils.isNullOrEmpty(commit1)) {
+ return commit2;
+ } else if (StringUtils.isNullOrEmpty(commit2)) {
+ return commit1;
+ }
+ return minInstant(commit1, commit2);
+ }
+
+ /**
+ * Returns the smaller of the given two instants.
+ */
+ static String minInstant(String instant1, String instant2) {
+ return compareTimestamps(instant1, LESSER_THAN, instant2) ? instant1 :
instant2;
+ }
+
/**
* Return true if specified timestamp is in range (startTs, endTs].
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 31e44b9e212..d4b6f14c272 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -35,6 +35,7 @@ import
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
@@ -436,7 +437,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
// Open the log record scanner using the log files from the latest file
slice
List<HoodieLogFile> logFiles =
slice.getLogFiles().collect(Collectors.toList());
Pair<HoodieMetadataLogRecordReader, Long> logRecordScannerOpenTimePair =
- getLogRecordScanner(logFiles, partitionName, Option.empty());
+ getLogRecordScanner(logFiles, partitionName, Option.empty(),
Option.empty());
HoodieMetadataLogRecordReader logRecordScanner =
logRecordScannerOpenTimePair.getKey();
final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
@@ -471,7 +472,8 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
public Pair<HoodieMetadataLogRecordReader, Long>
getLogRecordScanner(List<HoodieLogFile> logFiles,
String
partitionName,
-
Option<Boolean> allowFullScanOverride) {
+
Option<Boolean> allowFullScanOverride,
+
Option<String> timeTravelInstant) {
HoodieTimer timer = HoodieTimer.start();
List<String> sortedLogFilePaths = logFiles.stream()
.sorted(HoodieLogFile.getLogFileComparator())
@@ -485,6 +487,9 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
Option<HoodieInstant> latestMetadataInstant =
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
String latestMetadataInstantTime =
latestMetadataInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+ if (timeTravelInstant.isPresent()) {
+ latestMetadataInstantTime =
HoodieTimeline.minTimestamp(latestMetadataInstantTime, timeTravelInstant.get());
+ }
boolean allowFullScan = allowFullScanOverride.orElseGet(() ->
isFullScanAllowedForPartition(partitionName));
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index 1cf8bc5b655..4547569845c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -367,7 +367,7 @@ object LogFileIterator extends SparkAdapterSupport {
new StoragePath(tablePath), partitionPath)
val logRecordReader =
- metadataTable.getLogRecordScanner(logFiles.asJava,
relativePartitionPath, toJavaOption(Some(forceFullScan)))
+ metadataTable.getLogRecordScanner(logFiles.asJava,
relativePartitionPath, toJavaOption(Some(forceFullScan)),
toJavaOption(Some(tableState.latestCommitTimestamp.get)))
.getLeft
val recordList = closing(logRecordReader) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
index 04b37256a01..8fbae35badb 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
@@ -18,11 +18,14 @@
package org.apache.hudi.functional
-import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.HoodieColumnRangeMetadata
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.table.view.FileSystemViewManager
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util.ParquetUtils
@@ -33,16 +36,19 @@ import org.apache.hudi.storage.StoragePath
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
+import org.apache.hudi.util.JavaScalaConverters.convertJavaListToScalaSeq
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.functions.{col, explode}
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
import java.util
import java.util.Collections
+import java.util.stream.Collectors
import scala.collection.JavaConverters._
@@ -226,4 +232,168 @@ class TestMetadataTableWithSparkDataSource extends
SparkClientFunctionalTestHarn
private def parseRecords(records: Seq[String]) = {
spark.read.json(spark.sparkContext.parallelize(records, 2))
}
+
+ @Test
+ def testTimeTravelQuery(): Unit = {
+ val dataGen = new HoodieTestDataGenerator()
+ val metadataOpts: Map[String, String] = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+ DataSourceWriteOptions.TABLE_TYPE.key -> "MERGE_ON_READ",
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "5"
+ )
+ val combinedOpts: Map[String, String] = partitionedCommonOpts ++
metadataOpts
+
+ // Insert T0
+ val newRecords = dataGen.generateInserts("000", 100)
+ val newRecordsDF = parseRecords(recordsToStrings(newRecords).asScala.toSeq)
+ newRecordsDF.write.format(hudi)
+ .options(combinedOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ //Validate T0
+ val metaClient = HoodieTableMetaClient.builder
+ .setConf(storageConf())
+ .setBasePath(s"$basePath/.hoodie/metadata")
+ .build
+ val timelineT0 = metaClient.getActiveTimeline
+ assertEquals(3, timelineT0.countInstants())
+ assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION,
timelineT0.lastInstant().get().getAction)
+ val t0 = timelineT0.lastInstant().get().getTimestamp
+
+ val filesT0 = getFiles(basePath)
+ assertEquals(3, filesT0.size)
+
+ val baseMetaClient = HoodieTableMetaClient.builder
+ .setConf(storageConf())
+ .setBasePath(basePath)
+ .build
+ val filesT0FS = getFilesFromFs(baseMetaClient)
+ assertEquals(3, filesT0FS.size)
+ assertEquals(3, filesT0.intersect(filesT0FS).size)
+
+ // Update T1
+ val updatedRecords = dataGen.generateUpdates("001", newRecords)
+ val updatedRecordsDF =
parseRecords(recordsToStrings(updatedRecords).asScala.toSeq)
+ updatedRecordsDF.write.format(hudi)
+ .options(combinedOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ //Validate T1
+ val timelineT1 = metaClient.reloadActiveTimeline()
+ assertEquals(4, timelineT1.countInstants())
+ assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION,
timelineT1.lastInstant().get().getAction)
+ val t1 = timelineT1.lastInstant().get().getTimestamp
+
+ val filesT1 = getFiles(basePath)
+ assertEquals(6, filesT1.size)
+ assertEquals(3, filesT1.diff(filesT0).size)
+
+ val filesT1FS = getFilesFromFs(baseMetaClient)
+ assertEquals(6, filesT1FS.size)
+ assertEquals(6, filesT1.intersect(filesT1FS).size)
+
+ val filesT1travelT0 = getFilesAsOf(basePath, t0)
+ assertEquals(3, filesT1travelT0.size)
+ assertEquals(3, filesT1travelT0.intersect(filesT0).size)
+
+ //Update T2
+ val updatedRecords2 = dataGen.generateUpdates("002", updatedRecords)
+ val updatedRecords2DF =
parseRecords(recordsToStrings(updatedRecords2).asScala.toSeq)
+ updatedRecords2DF.write.format(hudi)
+ .options(combinedOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ //Validate T2
+ val timelineT2 = metaClient.reloadActiveTimeline()
+ assertEquals(5, timelineT2.countInstants())
+ assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION,
timelineT2.lastInstant().get().getAction)
+ val t2 = timelineT2.lastInstant().get().getTimestamp
+
+ val filesT2 = getFiles(basePath)
+ assertEquals(9, filesT2.size)
+ assertEquals(3, filesT2.diff(filesT1).size)
+
+ val filesT2FS = getFilesFromFs(baseMetaClient)
+ assertEquals(9, filesT2FS.size)
+ assertEquals(9, filesT2.intersect(filesT2FS).size)
+
+ val filesT2travelT1 = getFilesAsOf(basePath, t1)
+ assertEquals(6, filesT2travelT1.size)
+ assertEquals(6, filesT2travelT1.intersect(filesT1).size)
+
+ val filesT2travelT0 = getFilesAsOf(basePath, t0)
+ assertEquals(3, filesT2travelT0.size)
+ assertEquals(3, filesT2travelT0.intersect(filesT0).size)
+
+ //Update T3
+ val updatedRecords3 = dataGen.generateUpdates("003", updatedRecords2)
+ val updatedRecords3DF =
parseRecords(recordsToStrings(updatedRecords3).asScala.toSeq)
+ updatedRecords3DF.write.format(hudi)
+ .options(combinedOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ //Validate T3
+ val timelineT3 = metaClient.reloadActiveTimeline()
+ assertEquals(7, timelineT3.countInstants())
+ assertEquals(HoodieTimeline.COMMIT_ACTION,
timelineT3.getInstants.get(5).getAction)
+ assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION,
timelineT3.lastInstant().get().getAction)
+
+ val filesT3 = getFiles(basePath)
+ assertEquals(12, filesT3.size)
+ assertEquals(3, filesT3.diff(filesT2).size)
+
+ val filesT3FS = getFilesFromFs(baseMetaClient)
+ assertEquals(12, filesT3FS.size)
+ assertEquals(12, filesT3.intersect(filesT3FS).size)
+
+ val filesT3travelT2 = getFilesAsOf(basePath, t2)
+ assertEquals(9, filesT3travelT2.size)
+ assertEquals(9, filesT3travelT2.intersect(filesT2).size)
+
+ val filesT3travelT1 = getFilesAsOf(basePath, t1)
+ assertEquals(6, filesT3travelT1.size)
+ assertEquals(6, filesT3travelT1.intersect(filesT1).size)
+
+ val filesT3travelT0 = getFilesAsOf(basePath, t0)
+ assertEquals(3, filesT3travelT0.size)
+ assertEquals(3, filesT3travelT0.intersect(filesT0).size)
+ }
+
+ private def getFilesAsOf(basePath: String, timestamp: String):
scala.collection.GenSet[Any] = {
+ getFiles(basePath,
Map(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key() -> timestamp))
+ }
+
+ private def getFiles(basePath: String): scala.collection.GenSet[Any] = {
+ getFiles(basePath, Map.empty)
+ }
+
+ private def getFiles(basePath: String, opts: Map[String, String]):
scala.collection.GenSet[Any] = {
+
spark.read.format(hudi).options(opts).load(s"$basePath/.hoodie/metadata").where("type
= 2").select(explode(col("filesystemMetadata"))).drop("value").rdd.map(r =>
r(0)).collect().toSet
+ }
+
+ private def getFilesFromFs(metaClient: HoodieTableMetaClient):
scala.collection.GenSet[Any] = {
+ val engineContext = new HoodieSparkEngineContext(jsc())
+ val files = new util.ArrayList[String]()
+ val fsview =
FileSystemViewManager.createInMemoryFileSystemView(engineContext,
+ metaClient, HoodieMetadataConfig.newBuilder.enable(false).build())
+ fsview.loadAllPartitions()
+
convertJavaListToScalaSeq(fsview.getAllFileGroups.collect(Collectors.toList())).foreach(fg
=> {
+
convertJavaListToScalaSeq(fg.getAllFileSlices.collect(Collectors.toList())).foreach(fileSlice
=> {
+ if (fileSlice.getBaseFile.isPresent) {
+ files.add(fileSlice.getBaseFile.get().getFileName)
+ }
+
convertJavaListToScalaSeq(fileSlice.getLogFiles.collect(Collectors.toList())).foreach(logFile
=> files.add(logFile.getFileName))
+ })
+ })
+ files.toArray.toSet
+ }
}