This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new eae85cb5d81 [HUDI-8137] Support time travel query on MDT in Spark 
Datasource (#11861)
eae85cb5d81 is described below

commit eae85cb5d81b7aa2f8d613bc5e372ccd395ea871
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Sep 5 22:16:45 2024 -0400

    [HUDI-8137] Support time travel query on MDT in Spark Datasource (#11861)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../hudi/common/table/timeline/HoodieTimeline.java |  19 +++
 .../hudi/metadata/HoodieBackedTableMetadata.java   |   9 +-
 .../src/main/scala/org/apache/hudi/Iterators.scala |   2 +-
 .../TestMetadataTableWithSparkDataSource.scala     | 174 ++++++++++++++++++++-
 4 files changed, 199 insertions(+), 5 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index a7344fc1512..21e337f8165 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -437,6 +437,25 @@ public interface HoodieTimeline extends Serializable {
     return predicateToApply.test(commit1, commit2);
   }
 
+  /**
+   * Returns smaller of the two given timestamps. Returns the non null 
argument if one of the argument is null.
+   */
+  static String minTimestamp(String commit1, String commit2) {
+    if (StringUtils.isNullOrEmpty(commit1)) {
+      return commit2;
+    } else if (StringUtils.isNullOrEmpty(commit2)) {
+      return commit1;
+    }
+    return minInstant(commit1, commit2);
+  }
+
+  /**
+   * Returns the smaller of the given two instants.
+   */
+  static String minInstant(String instant1, String instant2) {
+    return compareTimestamps(instant1, LESSER_THAN, instant2) ? instant1 : 
instant2;
+  }
+
   /**
    * Return true if specified timestamp is in range (startTs, endTs].
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 31e44b9e212..d4b6f14c272 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -35,6 +35,7 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
@@ -436,7 +437,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
       // Open the log record scanner using the log files from the latest file 
slice
       List<HoodieLogFile> logFiles = 
slice.getLogFiles().collect(Collectors.toList());
       Pair<HoodieMetadataLogRecordReader, Long> logRecordScannerOpenTimePair =
-          getLogRecordScanner(logFiles, partitionName, Option.empty());
+          getLogRecordScanner(logFiles, partitionName, Option.empty(), 
Option.empty());
       HoodieMetadataLogRecordReader logRecordScanner = 
logRecordScannerOpenTimePair.getKey();
       final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
 
@@ -471,7 +472,8 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
 
   public Pair<HoodieMetadataLogRecordReader, Long> 
getLogRecordScanner(List<HoodieLogFile> logFiles,
                                                                        String 
partitionName,
-                                                                       
Option<Boolean> allowFullScanOverride) {
+                                                                       
Option<Boolean> allowFullScanOverride,
+                                                                       
Option<String> timeTravelInstant) {
     HoodieTimer timer = HoodieTimer.start();
     List<String> sortedLogFilePaths = logFiles.stream()
         .sorted(HoodieLogFile.getLogFileComparator())
@@ -485,6 +487,9 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
 
     Option<HoodieInstant> latestMetadataInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
     String latestMetadataInstantTime = 
latestMetadataInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+    if (timeTravelInstant.isPresent()) {
+      latestMetadataInstantTime = 
HoodieTimeline.minTimestamp(latestMetadataInstantTime, timeTravelInstant.get());
+    }
 
     boolean allowFullScan = allowFullScanOverride.orElseGet(() -> 
isFullScanAllowedForPartition(partitionName));
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index 1cf8bc5b655..4547569845c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -367,7 +367,7 @@ object LogFileIterator extends SparkAdapterSupport {
         new StoragePath(tablePath), partitionPath)
 
       val logRecordReader =
-        metadataTable.getLogRecordScanner(logFiles.asJava, 
relativePartitionPath, toJavaOption(Some(forceFullScan)))
+        metadataTable.getLogRecordScanner(logFiles.asJava, 
relativePartitionPath, toJavaOption(Some(forceFullScan)), 
toJavaOption(Some(tableState.latestCommitTimestamp.get)))
           .getLeft
 
       val recordList = closing(logRecordReader) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
index 04b37256a01..8fbae35badb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
@@ -18,11 +18,14 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.table.view.FileSystemViewManager
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.util.ParquetUtils
@@ -33,16 +36,19 @@ import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 import 
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
+import org.apache.hudi.util.JavaScalaConverters.convertJavaListToScalaSeq
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.functions.{col, explode}
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
 
 import java.util
 import java.util.Collections
+import java.util.stream.Collectors
 
 import scala.collection.JavaConverters._
 
@@ -226,4 +232,168 @@ class TestMetadataTableWithSparkDataSource extends 
SparkClientFunctionalTestHarn
   private def parseRecords(records: Seq[String]) = {
     spark.read.json(spark.sparkContext.parallelize(records, 2))
   }
+
+  @Test
+  def testTimeTravelQuery(): Unit = {
+    val dataGen = new HoodieTestDataGenerator()
+    val metadataOpts: Map[String, String] = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+      DataSourceWriteOptions.TABLE_TYPE.key -> "MERGE_ON_READ",
+      HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "5"
+    )
+    val combinedOpts: Map[String, String] = partitionedCommonOpts ++ 
metadataOpts
+
+    // Insert T0
+    val newRecords = dataGen.generateInserts("000", 100)
+    val newRecordsDF = parseRecords(recordsToStrings(newRecords).asScala.toSeq)
+    newRecordsDF.write.format(hudi)
+      .options(combinedOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    //Validate T0
+    val metaClient = HoodieTableMetaClient.builder
+      .setConf(storageConf())
+      .setBasePath(s"$basePath/.hoodie/metadata")
+      .build
+    val timelineT0 = metaClient.getActiveTimeline
+    assertEquals(3, timelineT0.countInstants())
+    assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, 
timelineT0.lastInstant().get().getAction)
+    val t0 = timelineT0.lastInstant().get().getTimestamp
+
+    val filesT0 = getFiles(basePath)
+    assertEquals(3, filesT0.size)
+
+    val baseMetaClient = HoodieTableMetaClient.builder
+      .setConf(storageConf())
+      .setBasePath(basePath)
+      .build
+    val filesT0FS = getFilesFromFs(baseMetaClient)
+    assertEquals(3, filesT0FS.size)
+    assertEquals(3, filesT0.intersect(filesT0FS).size)
+
+    // Update T1
+    val updatedRecords = dataGen.generateUpdates("001", newRecords)
+    val updatedRecordsDF = 
parseRecords(recordsToStrings(updatedRecords).asScala.toSeq)
+    updatedRecordsDF.write.format(hudi)
+      .options(combinedOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    //Validate T1
+    val timelineT1 = metaClient.reloadActiveTimeline()
+    assertEquals(4, timelineT1.countInstants())
+    assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, 
timelineT1.lastInstant().get().getAction)
+    val t1 =  timelineT1.lastInstant().get().getTimestamp
+
+    val filesT1 = getFiles(basePath)
+    assertEquals(6, filesT1.size)
+    assertEquals(3, filesT1.diff(filesT0).size)
+
+    val filesT1FS = getFilesFromFs(baseMetaClient)
+    assertEquals(6, filesT1FS.size)
+    assertEquals(6, filesT1.intersect(filesT1FS).size)
+
+    val filesT1travelT0 = getFilesAsOf(basePath, t0)
+    assertEquals(3, filesT1travelT0.size)
+    assertEquals(3, filesT1travelT0.intersect(filesT0).size)
+
+    //Update T2
+    val updatedRecords2 = dataGen.generateUpdates("002", updatedRecords)
+    val updatedRecords2DF = 
parseRecords(recordsToStrings(updatedRecords2).asScala.toSeq)
+    updatedRecords2DF.write.format(hudi)
+      .options(combinedOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    //Validate T2
+    val timelineT2 = metaClient.reloadActiveTimeline()
+    assertEquals(5, timelineT2.countInstants())
+    assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, 
timelineT2.lastInstant().get().getAction)
+    val t2 =  timelineT2.lastInstant().get().getTimestamp
+
+    val filesT2 = getFiles(basePath)
+    assertEquals(9, filesT2.size)
+    assertEquals(3, filesT2.diff(filesT1).size)
+
+    val filesT2FS = getFilesFromFs(baseMetaClient)
+    assertEquals(9, filesT2FS.size)
+    assertEquals(9, filesT2.intersect(filesT2FS).size)
+
+    val filesT2travelT1 = getFilesAsOf(basePath, t1)
+    assertEquals(6, filesT2travelT1.size)
+    assertEquals(6, filesT2travelT1.intersect(filesT1).size)
+
+    val filesT2travelT0 = getFilesAsOf(basePath, t0)
+    assertEquals(3, filesT2travelT0.size)
+    assertEquals(3, filesT2travelT0.intersect(filesT0).size)
+
+    //Update T3
+    val updatedRecords3 = dataGen.generateUpdates("003", updatedRecords2)
+    val updatedRecords3DF = 
parseRecords(recordsToStrings(updatedRecords3).asScala.toSeq)
+    updatedRecords3DF.write.format(hudi)
+      .options(combinedOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    //Validate T3
+    val timelineT3 = metaClient.reloadActiveTimeline()
+    assertEquals(7, timelineT3.countInstants())
+    assertEquals(HoodieTimeline.COMMIT_ACTION, 
timelineT3.getInstants.get(5).getAction)
+    assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, 
timelineT3.lastInstant().get().getAction)
+
+    val filesT3 = getFiles(basePath)
+    assertEquals(12, filesT3.size)
+    assertEquals(3, filesT3.diff(filesT2).size)
+
+    val filesT3FS = getFilesFromFs(baseMetaClient)
+    assertEquals(12, filesT3FS.size)
+    assertEquals(12, filesT3.intersect(filesT3FS).size)
+
+    val filesT3travelT2 = getFilesAsOf(basePath, t2)
+    assertEquals(9, filesT3travelT2.size)
+    assertEquals(9, filesT3travelT2.intersect(filesT2).size)
+
+    val filesT3travelT1 = getFilesAsOf(basePath, t1)
+    assertEquals(6, filesT3travelT1.size)
+    assertEquals(6, filesT3travelT1.intersect(filesT1).size)
+
+    val filesT3travelT0 = getFilesAsOf(basePath, t0)
+    assertEquals(3, filesT3travelT0.size)
+    assertEquals(3, filesT3travelT0.intersect(filesT0).size)
+  }
+
+  private def getFilesAsOf(basePath: String, timestamp: String): 
scala.collection.GenSet[Any] = {
+    getFiles(basePath, 
Map(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key() -> timestamp))
+  }
+
+  private def getFiles(basePath: String): scala.collection.GenSet[Any] = {
+    getFiles(basePath, Map.empty)
+  }
+
+  private def getFiles(basePath: String, opts: Map[String, String]): 
scala.collection.GenSet[Any] = {
+    
spark.read.format(hudi).options(opts).load(s"$basePath/.hoodie/metadata").where("type
 = 2").select(explode(col("filesystemMetadata"))).drop("value").rdd.map(r => 
r(0)).collect().toSet
+  }
+
+  private def getFilesFromFs(metaClient: HoodieTableMetaClient): 
scala.collection.GenSet[Any] = {
+    val engineContext = new HoodieSparkEngineContext(jsc())
+    val files = new util.ArrayList[String]()
+    val fsview = 
FileSystemViewManager.createInMemoryFileSystemView(engineContext,
+      metaClient, HoodieMetadataConfig.newBuilder.enable(false).build())
+    fsview.loadAllPartitions()
+    
convertJavaListToScalaSeq(fsview.getAllFileGroups.collect(Collectors.toList())).foreach(fg
 => {
+      
convertJavaListToScalaSeq(fg.getAllFileSlices.collect(Collectors.toList())).foreach(fileSlice
 => {
+        if (fileSlice.getBaseFile.isPresent) {
+          files.add(fileSlice.getBaseFile.get().getFileName)
+        }
+        
convertJavaListToScalaSeq(fileSlice.getLogFiles.collect(Collectors.toList())).foreach(logFile
 => files.add(logFile.getFileName))
+      })
+    })
+    files.toArray.toSet
+  }
 }

Reply via email to