This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c3848195799 [HUDI-7060] Disable new file group reader or new parquet
file format for CDC, time travel, and incremental query types (#10032)
c3848195799 is described below
commit c38481957996606076b7440e18de388cf1d103c1
Author: Lin Liu <[email protected]>
AuthorDate: Thu Nov 9 08:54:19 2023 -0800
[HUDI-7060] Disable new file group reader or new parquet file format for
CDC, time travel, and incremental query types (#10032)
1. For CDC queries, we should not use either of the two file formats. Then
we keep using the existing relation.
2. For time_travel queries and bootstrap queries, we use
new_spark_file_format but not use file_group_reader_file_format.
3. For Incremental queries, we use the existing relations, e.g.,
IncrementalRelation or MergeOnReadIncrementalRelation to handle.
---
.../main/scala/org/apache/hudi/DefaultSource.scala | 41 +++-------------------
.../hudi/HoodieHadoopFsRelationFactory.scala | 5 ++-
2 files changed, 9 insertions(+), 37 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 7bdaa9609ef..8491ae46d34 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -240,12 +240,7 @@ object DefaultSource {
if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants()
== 0) {
new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters,
Some(schema)))
} else if (isCdcQuery) {
- if (useNewPaquetFileFormat) {
- new HoodieMergeOnReadCDCHadoopFsRelationFactory(
- sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
- } else {
CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
- }
} else {
lazy val fileFormatUtils = if ((isMultipleBaseFileFormatsEnabled &&
!isBootstrappedTable)
|| (useNewPaquetFileFormat
@@ -276,21 +271,8 @@ object DefaultSource {
resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema,
metaClient, parameters)
}
- case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, false) =>
- if (fileFormatUtils.isDefined) {
- new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
- sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
- } else {
- new IncrementalRelation(sqlContext, parameters, userSchema,
metaClient)
- }
-
- case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, true) =>
- if (fileFormatUtils.isDefined) {
- new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
- sqlContext, metaClient, parameters, userSchema, isBootstrap =
true).build()
- } else {
- new IncrementalRelation(sqlContext, parameters, userSchema,
metaClient)
- }
+ case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
+ new IncrementalRelation(sqlContext, parameters, userSchema,
metaClient)
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
if (fileFormatUtils.isDefined) {
@@ -305,24 +287,11 @@ object DefaultSource {
new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
} else {
- new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths,
metaClient, parameters)
- }
-
- case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, true) =>
- if (fileFormatUtils.isDefined) {
- new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
- sqlContext, metaClient, parameters, userSchema, isBootstrap =
true).build()
- } else {
- MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient,
userSchema)
+ HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths,
metaClient, parameters)
}
- case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, false) =>
- if (fileFormatUtils.isDefined) {
- new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
- sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
- } else {
- MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient,
userSchema)
- }
+ case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
+ MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient,
userSchema)
case (_, _, true) =>
if (fileFormatUtils.isDefined) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index a49fee2b740..908193a2b75 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -22,6 +22,7 @@ import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema,
isSchemaEvolutionEnabledOnRead}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.HoodieFileIndex.getConfigProperties
@@ -244,10 +245,12 @@ class
HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext:
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, true,
false, Seq.empty)
+ private val isTimeTravelQuery: Boolean =
options.contains(TIME_TRAVEL_AS_OF_INSTANT.key())
+
override def buildFileIndex(): FileIndex = fileIndex
override def buildFileFormat(): FileFormat = {
- if (fileGroupReaderEnabled) {
+ if (fileGroupReaderEnabled && !isTimeTravelQuery & !isBootstrap) {
fileGroupReaderBasedFileFormat
} else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled &&
!isBootstrap) {
multipleBaseFileFormat