This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c3848195799 [HUDI-7060] Disable new file group reader or new parquet 
file format for CDC, time travel, and incremental query types (#10032)
c3848195799 is described below

commit c38481957996606076b7440e18de388cf1d103c1
Author: Lin Liu <[email protected]>
AuthorDate: Thu Nov 9 08:54:19 2023 -0800

    [HUDI-7060] Disable new file group reader or new parquet file format for 
CDC, time travel, and incremental query types (#10032)
    
    1. For CDC queries, we should not use either of the two file formats. Then 
we keep using the existing relation.
    2. For time_travel queries and bootstrap queries, we use 
new_spark_file_format but not use file_group_reader_file_format.
    3. For Incremental queries, we use the existing relations, e.g., 
IncrementalRelation or MergeOnReadIncrementalRelation to handle.
---
 .../main/scala/org/apache/hudi/DefaultSource.scala | 41 +++-------------------
 .../hudi/HoodieHadoopFsRelationFactory.scala       |  5 ++-
 2 files changed, 9 insertions(+), 37 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 7bdaa9609ef..8491ae46d34 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -240,12 +240,7 @@ object DefaultSource {
     if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() 
== 0) {
       new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters, 
Some(schema)))
     } else if (isCdcQuery) {
-      if (useNewPaquetFileFormat) {
-          new HoodieMergeOnReadCDCHadoopFsRelationFactory(
-            sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
-      } else {
         CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
-      }
     } else {
       lazy val fileFormatUtils = if ((isMultipleBaseFileFormatsEnabled && 
!isBootstrappedTable)
         || (useNewPaquetFileFormat
@@ -276,21 +271,8 @@ object DefaultSource {
             resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
           }
 
-        case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, false) =>
-          if (fileFormatUtils.isDefined) {
-            new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
-              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
-          } else {
-            new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
-          }
-
-        case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, true) =>
-          if (fileFormatUtils.isDefined) {
-            new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
-              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
true).build()
-          } else {
-            new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
-          }
+        case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
+          new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
 
         case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
           if (fileFormatUtils.isDefined) {
@@ -305,24 +287,11 @@ object DefaultSource {
             new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
               sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
           } else {
-            new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, 
metaClient, parameters)
-          }
-
-        case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, true) =>
-          if (fileFormatUtils.isDefined) {
-            new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
-              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
true).build()
-          } else {
-            MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, 
userSchema)
+            HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, 
metaClient, parameters)
           }
 
-        case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, false) =>
-          if (fileFormatUtils.isDefined) {
-            new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
-              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
-          } else {
-            MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, 
userSchema)
-          }
+        case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
+          MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, 
userSchema)
 
         case (_, _, true) =>
           if (fileFormatUtils.isDefined) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index a49fee2b740..908193a2b75 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -22,6 +22,7 @@ import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT
 import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, 
isSchemaEvolutionEnabledOnRead}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.HoodieFileIndex.getConfigProperties
@@ -244,10 +245,12 @@ class 
HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext:
     sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
     metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, true, 
false, Seq.empty)
 
+  private val isTimeTravelQuery: Boolean = 
options.contains(TIME_TRAVEL_AS_OF_INSTANT.key())
+
   override def buildFileIndex(): FileIndex = fileIndex
 
   override def buildFileFormat(): FileFormat = {
-    if (fileGroupReaderEnabled) {
+    if (fileGroupReaderEnabled && !isTimeTravelQuery & !isBootstrap) {
       fileGroupReaderBasedFileFormat
     } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
       multipleBaseFileFormat

Reply via email to