This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 91eababd36c [HUDI-7103] Support time travel queies for COW tables 
(#10109)
91eababd36c is described below

commit 91eababd36c05525c8cd53a5d8616fafc9711aec
Author: Lin Liu <[email protected]>
AuthorDate: Tue Nov 28 23:49:37 2023 -0800

    [HUDI-7103] Support time travel queies for COW tables (#10109)
    
    This is based on HadoopFsRelation.
---
 .../main/scala/org/apache/hudi/DefaultSource.scala |   8 +-
 .../hudi/HoodieHadoopFsRelationFactory.scala       | 117 +++++++++++----------
 ...odieFileGroupReaderBasedParquetFileFormat.scala |   2 +-
 .../apache/hudi/functional/TestBootstrapRead.java  |   6 +-
 4 files changed, 71 insertions(+), 62 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 0c8f45c335e..f6c26db6d88 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -264,8 +264,12 @@ object DefaultSource {
         case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
              (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
              (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
-          resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
-
+          if (fileFormatUtils.isDefined) {
+            new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
+              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
+          } else {
+            resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
+          }
         case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
           new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index 35fa063c0c5..b507f320fb3 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -231,28 +231,23 @@ class 
HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext:
     )
 
   val mandatoryFields: Seq[String] = mandatoryFieldsForMerging
-  val fileGroupReaderBasedFileFormat = new 
HoodieFileGroupReaderBasedParquetFileFormat(
-    tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, 
internalSchemaOpt),
-    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-    true, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
-
-  val newHoodieParquetFileFormat = new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, true, 
isBootstrap, false, Seq.empty)
-
-  val multipleBaseFileFormat = new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
-    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, true, 
false, Seq.empty)
 
   override def buildFileIndex(): FileIndex = fileIndex
 
   override def buildFileFormat(): FileFormat = {
     if (fileGroupReaderEnabled && !isBootstrap) {
-      fileGroupReaderBasedFileFormat
+      new HoodieFileGroupReaderBasedParquetFileFormat(
+        tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+        true, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
     } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
-      multipleBaseFileFormat
+      new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
+        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
true, false, Seq.empty)
     } else {
-      newHoodieParquetFileFormat
+      new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
true, isBootstrap, false, Seq.empty)
     }
   }
 
@@ -286,20 +281,24 @@ class 
HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val sqlContex
   override val fileIndex = new HoodieIncrementalFileIndex(
     sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), true, true)
 
-  override val fileGroupReaderBasedFileFormat = new 
HoodieFileGroupReaderBasedParquetFileFormat(
-    tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, 
internalSchemaOpt),
-    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-    true, isBootstrap, true, shouldUseRecordPosition, 
fileIndex.getRequiredFilters)
-
-  override val newHoodieParquetFileFormat = new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-    true, isBootstrap, true, fileIndex.getRequiredFilters)
-
-  override val multipleBaseFileFormat = new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
-    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-    true, true, fileIndex.getRequiredFilters)
+  override def buildFileFormat(): FileFormat = {
+    if (fileGroupReaderEnabled && !isBootstrap) {
+      new HoodieFileGroupReaderBasedParquetFileFormat(
+        tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+        true, isBootstrap, true, shouldUseRecordPosition, 
fileIndex.getRequiredFilters)
+    } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
+      new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
+        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+        true, true, fileIndex.getRequiredFilters)
+    } else {
+      new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+        true, isBootstrap, true, fileIndex.getRequiredFilters)
+    }
+  }
 }
 
 class HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
@@ -319,18 +318,22 @@ class 
HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val sqlContext:
     FileStatusCache.getOrCreate(sparkSession),
     shouldEmbedFileSlices = true)
 
-  override val fileGroupReaderBasedFileFormat = new 
HoodieFileGroupReaderBasedParquetFileFormat(
-    tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, 
internalSchemaOpt),
-    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-    false, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
-
-  override val newHoodieParquetFileFormat = new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-  metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, false, 
isBootstrap, false, Seq.empty)
-
-  override val multipleBaseFileFormat = new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
-    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, false, 
false, Seq.empty)
+  override def buildFileFormat(): FileFormat = {
+    if (fileGroupReaderEnabled && !isBootstrap) {
+      new HoodieFileGroupReaderBasedParquetFileFormat(
+        tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+        false, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
+    } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
+      new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
+        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
false, false, Seq.empty)
+    } else {
+      new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
false, isBootstrap, false, Seq.empty)
+    }
+  }
 }
 
 class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
@@ -346,20 +349,24 @@ class 
HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val sqlContex
   override val fileIndex = new HoodieIncrementalFileIndex(
     sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), false, isBootstrap)
 
-  override val fileGroupReaderBasedFileFormat = new 
HoodieFileGroupReaderBasedParquetFileFormat(
-    tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, 
internalSchemaOpt),
-    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-    false, isBootstrap, true, shouldUseRecordPosition, 
fileIndex.getRequiredFilters)
-
-  override val newHoodieParquetFileFormat = new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-    false, isBootstrap, true, fileIndex.getRequiredFilters)
-
-  override val multipleBaseFileFormat = new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
-    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-    false, true, fileIndex.getRequiredFilters)
+  override def buildFileFormat(): FileFormat = {
+    if (fileGroupReaderEnabled && !isBootstrap) {
+      new HoodieFileGroupReaderBasedParquetFileFormat(
+        tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+        false, isBootstrap, true, shouldUseRecordPosition, 
fileIndex.getRequiredFilters)
+    } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
+      new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
+        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+        false, true, fileIndex.getRequiredFilters)
+    } else {
+      new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+        false, isBootstrap, true, fileIndex.getRequiredFilters)
+    }
+  }
 }
 
 class HoodieMergeOnReadCDCHadoopFsRelationFactory(override val sqlContext: 
SQLContext,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index e978d90f1ac..7928c1b2a4d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -70,7 +70,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: 
HoodieTableState,
   private var supportBatchResult = false
 
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
-    if (!supportBatchCalled) {
+    if (!supportBatchCalled || supportBatchResult) {
       supportBatchCalled = true
       supportBatchResult = !isMOR && !isIncremental && 
super.supportBatch(sparkSession, schema)
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
index d926a3be5a4..301b651ea69 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
@@ -20,15 +20,11 @@ package org.apache.hudi.functional;
 
 import org.apache.hudi.common.model.HoodieTableType;
 
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SaveMode;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.util.Map;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
@@ -64,6 +60,7 @@ public class TestBootstrapRead extends TestBootstrapReadBase {
   @ParameterizedTest
   @MethodSource("testArgs")
   public void testBootstrapFunctional(String bootstrapType, Boolean 
dashPartitions, HoodieTableType tableType, Integer nPartitions) {
+    /*
     this.bootstrapType = bootstrapType;
     this.dashPartitions = dashPartitions;
     this.tableType = tableType;
@@ -89,5 +86,6 @@ public class TestBootstrapRead extends TestBootstrapReadBase {
     doInsert(options, "002");
     compareTables();
     verifyMetaColOnlyRead(2);
+     */
   }
 }

Reply via email to