This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fdc42928c90 [HUDI-7215] Delete NewHoodieParquetFileFormat (#10304)
fdc42928c90 is described below
commit fdc42928c905032fda20a208d5d3a7cdb0e377f7
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Dec 13 16:30:03 2023 -0500
[HUDI-7215] Delete NewHoodieParquetFileFormat (#10304)
Co-authored-by: Jonathan Vexler <=>
---
.../scala/org/apache/hudi/DataSourceOptions.scala | 9 -
.../main/scala/org/apache/hudi/DefaultSource.scala | 66 +---
.../hudi/HoodieHadoopFsRelationFactory.scala | 62 ++--
.../apache/hudi/HoodieSparkFileFormatUtils.scala | 233 -------------
.../parquet/NewHoodieParquetFileFormat.scala | 378 ---------------------
.../hudi/TestHoodieMergeHandleWithSparkMerger.java | 3 -
.../apache/hudi/functional/TestBootstrapRead.java | 6 +-
.../hudi/functional/TestBootstrapReadBase.java | 12 -
.../functional/TestNewHoodieParquetFileFormat.java | 5 -
.../hudi/TestAvroSchemaResolutionSupport.scala | 5 -
.../read/TestHoodieFileGroupReaderOnSpark.scala | 2 -
.../hudi/functional/TestMORDataSourceStorage.scala | 1 -
.../functional/TestPartialUpdateAvroPayload.scala | 3 +-
.../sql/hudi/TestPartialUpdateForMergeInto.scala | 5 -
.../hudi/utilities/sources/HoodieIncrSource.java | 4 +-
...TestHoodieDeltaStreamerSchemaEvolutionBase.java | 3 -
16 files changed, 40 insertions(+), 757 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index ac60606a4bc..d618a604388 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -86,15 +86,6 @@ object DataSourceReadOptions {
s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL})
or skip merging altogether" +
s"${REALTIME_SKIP_MERGE_OPT_VAL}")
- val USE_NEW_HUDI_PARQUET_FILE_FORMAT: ConfigProperty[String] = ConfigProperty
- .key("hoodie.datasource.read.use.new.parquet.file.format")
- .defaultValue("true")
- .markAdvanced()
- .sinceVersion("0.14.0")
- .withDocumentation("Read using the new Hudi parquet file format. The new
Hudi parquet file format is " +
- "introduced as an experimental feature in 0.14.0. Currently, the new
Hudi parquet file format only applies " +
- "to bootstrap and MOR queries. Schema evolution is also not supported by
the new file format.")
-
val READ_PATHS: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.paths")
.noDefaultValue()
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index ac8286b1bde..222a447f9f5 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL,
OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieReaderConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
import org.apache.hudi.common.model.{HoodieTableType, WriteConcurrencyMode}
@@ -235,15 +236,11 @@ object DefaultSource {
Option(schema)
}
- val useNewParquetFileFormat =
- parameters.getOrElse(
- USE_NEW_HUDI_PARQUET_FILE_FORMAT.key,
- USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean &&
- !metaClient.isMetadataTable &&
- (globPaths == null || globPaths.isEmpty) &&
- parameters.getOrElse(REALTIME_MERGE.key(), REALTIME_MERGE.defaultValue())
- .equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL)
-
+ val useNewParquetFileFormat =
parameters.getOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
+
HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue().toString).toBoolean
&&
+ !metaClient.isMetadataTable && (globPaths == null || globPaths.isEmpty)
&&
+ !parameters.getOrDefault(SCHEMA_EVOLUTION_ENABLED.key(),
SCHEMA_EVOLUTION_ENABLED.defaultValue().toString).toBoolean &&
+ parameters.getOrElse(REALTIME_MERGE.key(),
REALTIME_MERGE.defaultValue()).equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL)
if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants()
== 0) {
new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters,
Some(schema)))
} else if (isCdcQuery) {
@@ -259,57 +256,35 @@ object DefaultSource {
CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
}
} else {
- lazy val fileFormatUtils = if ((isMultipleBaseFileFormatsEnabled &&
!isBootstrappedTable)
- || (useNewParquetFileFormat)) {
- val formatUtils = new HoodieSparkFileFormatUtils(sqlContext,
metaClient, parameters, userSchema)
- if (formatUtils.hasSchemaOnRead) Option.empty else Some(formatUtils)
- } else {
- Option.empty
- }
-
- if (isMultipleBaseFileFormatsEnabled) {
- if (isBootstrappedTable) {
- throw new HoodieException(s"Multiple base file formats are not
supported for bootstrapped table")
- }
- resolveMultiFileFormatRelation(tableType, queryType,
fileFormatUtils.get)
- }
(tableType, queryType, isBootstrappedTable) match {
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
- if (fileFormatUtils.isDefined) {
+ if (useNewParquetFileFormat) {
new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
} else {
resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema,
metaClient, parameters)
}
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
- if (fileFormatUtils.isDefined) {
+ if (useNewParquetFileFormat) {
new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
} else {
new IncrementalRelation(sqlContext, parameters, userSchema,
metaClient)
}
- case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
- if (fileFormatUtils.isDefined) {
+ case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, _) =>
+ if (useNewParquetFileFormat) {
new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
- sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
+ sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
} else {
new MergeOnReadSnapshotRelation(sqlContext, parameters,
metaClient, globPaths, userSchema)
}
- case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
- if (fileFormatUtils.isDefined) {
- new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
- sqlContext, metaClient, parameters, userSchema, isBootstrap =
true).build()
- } else {
- HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths,
metaClient, parameters)
- }
-
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
- if (fileFormatUtils.isDefined) {
+ if (useNewParquetFileFormat) {
new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
} else {
@@ -317,7 +292,7 @@ object DefaultSource {
}
case (_, _, true) =>
- if (fileFormatUtils.isDefined) {
+ if (useNewParquetFileFormat) {
new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap =
true).build()
} else {
@@ -368,21 +343,6 @@ object DefaultSource {
}
}
- private def resolveMultiFileFormatRelation(tableType: HoodieTableType,
- queryType: String,
- fileFormatUtils:
HoodieSparkFileFormatUtils): BaseRelation = {
- (tableType, queryType) match {
- case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL) |
- (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) =>
- fileFormatUtils.getHadoopFsRelation(isMOR = false, isBootstrap = false)
- case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) |
- (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) =>
- fileFormatUtils.getHadoopFsRelation(isMOR = true, isBootstrap = false)
- case (_, _) =>
- throw new HoodieException(s"Multiple base file formats not supported
for query type : $queryType for tableType: $tableType")
- }
- }
-
private def resolveSchema(metaClient: HoodieTableMetaClient,
parameters: Map[String, String],
schema: Option[StructType]): StructType = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index aee15189545..34088f2e699 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -34,16 +34,16 @@ import org.apache.hudi.common.util.{ConfigUtils,
StringUtils}
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
+import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
import org.apache.hudi.metadata.HoodieTableMetadataUtil
import org.apache.spark.sql.catalyst.analysis.Resolver
-import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import
org.apache.spark.sql.execution.datasources.parquet.{HoodieFileGroupReaderBasedParquetFileFormat,
NewHoodieParquetFileFormat, ParquetFileFormat}
+import
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat
import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex,
FileStatusCache, HadoopFsRelation, HoodieMultipleBaseFileFormat}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{SQLContext, SparkSession}
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
@@ -169,8 +169,6 @@ abstract class HoodieBaseHadoopFsRelationFactory(val
sqlContext: SQLContext,
protected lazy val mandatoryFieldsForMerging: Seq[String] =
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
- protected lazy val fileGroupReaderEnabled: Boolean =
checkIfAConfigurationEnabled(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
-
protected lazy val shouldUseRecordPosition: Boolean =
checkIfAConfigurationEnabled(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS)
protected def queryTimestamp: Option[String] =
@@ -234,19 +232,15 @@ class
HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext:
override def buildFileIndex(): FileIndex = fileIndex
override def buildFileFormat(): FileFormat = {
- if (fileGroupReaderEnabled) {
- new HoodieFileGroupReaderBasedParquetFileFormat(
- tableState, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
- true, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
- } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled &&
!isBootstrap) {
+ if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled &&
!isBootstrap) {
new
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
true, false, Seq.empty)
} else {
- new
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
true, isBootstrap, false, Seq.empty)
+ new HoodieFileGroupReaderBasedParquetFileFormat(
+ tableState, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+ true, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
}
}
@@ -281,21 +275,16 @@ class
HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val sqlContex
sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), true, true)
override def buildFileFormat(): FileFormat = {
- if (fileGroupReaderEnabled) {
- new HoodieFileGroupReaderBasedParquetFileFormat(
- tableState, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
- true, isBootstrap, true, shouldUseRecordPosition,
fileIndex.getRequiredFilters)
- } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled &&
!isBootstrap) {
+ if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled &&
!isBootstrap) {
new
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
true, true, fileIndex.getRequiredFilters)
} else {
- new
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
+ new HoodieFileGroupReaderBasedParquetFileFormat(
+ tableState, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
- true, isBootstrap, true, fileIndex.getRequiredFilters)
+ true, isBootstrap, true, shouldUseRecordPosition,
fileIndex.getRequiredFilters)
}
}
}
@@ -318,19 +307,15 @@ class
HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val sqlContext:
shouldEmbedFileSlices = true)
override def buildFileFormat(): FileFormat = {
- if (fileGroupReaderEnabled) {
- new HoodieFileGroupReaderBasedParquetFileFormat(
- tableState, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
- false, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
- } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled &&
!isBootstrap) {
+ if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled &&
!isBootstrap) {
new
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
false, false, Seq.empty)
} else {
- new
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
false, isBootstrap, false, Seq.empty)
+ new HoodieFileGroupReaderBasedParquetFileFormat(
+ tableState, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+ false, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
}
}
}
@@ -349,21 +334,16 @@ class
HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val sqlContex
sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), false, isBootstrap)
override def buildFileFormat(): FileFormat = {
- if (fileGroupReaderEnabled) {
- new HoodieFileGroupReaderBasedParquetFileFormat(
- tableState, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
- false, isBootstrap, true, shouldUseRecordPosition,
fileIndex.getRequiredFilters)
- } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled &&
!isBootstrap) {
+ if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled &&
!isBootstrap) {
new
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
false, true, fileIndex.getRequiredFilters)
} else {
- new
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
+ new HoodieFileGroupReaderBasedParquetFileFormat(
+ tableState, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
- false, isBootstrap, true, fileIndex.getRequiredFilters)
+ false, isBootstrap, true, shouldUseRecordPosition,
fileIndex.getRequiredFilters)
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
deleted file mode 100644
index fe1645d3b60..00000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.avro.Schema
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hudi.HoodieBaseRelation._
-import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.common.config.{ConfigProperty, HoodieReaderConfig}
-import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.common.table.timeline.HoodieTimeline
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
-import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
-import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
-import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
-import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
-import org.apache.spark.sql.catalyst.analysis.Resolver
-import
org.apache.spark.sql.execution.datasources.parquet.{HoodieFileGroupReaderBasedParquetFileFormat,
NewHoodieParquetFileFormat}
-import org.apache.spark.sql.execution.datasources.{FileStatusCache,
HadoopFsRelation, HoodieMultipleBaseFileFormat}
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
-import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{SQLContext, SparkSession}
-
-import scala.collection.JavaConverters._
-import scala.util.{Failure, Success, Try}
-
-class HoodieSparkFileFormatUtils(val sqlContext: SQLContext,
- val metaClient: HoodieTableMetaClient,
- val optParamsInput: Map[String, String],
- private val schemaSpec: Option[StructType])
extends SparkAdapterSupport {
- protected val sparkSession: SparkSession = sqlContext.sparkSession
-
- protected val optParams: Map[String, String] = optParamsInput.filter(kv =>
!kv._1.equals(DATA_QUERIES_ONLY.key()))
- protected def tableName: String = metaClient.getTableConfig.getTableName
-
- protected lazy val resolver: Resolver =
sparkSession.sessionState.analyzer.resolver
-
- private lazy val metaFieldNames =
HoodieRecord.HOODIE_META_COLUMNS.asScala.toSet
-
- protected lazy val conf: Configuration = new
Configuration(sqlContext.sparkContext.hadoopConfiguration)
- protected lazy val jobConf = new JobConf(conf)
-
- protected lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig
-
- protected lazy val basePath: Path = metaClient.getBasePathV2
-
- protected lazy val (tableAvroSchema: Schema, internalSchemaOpt:
Option[InternalSchema]) = {
- val schemaResolver = new TableSchemaResolver(metaClient)
- val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(optParams,
sparkSession)) {
- None
- } else {
- Try {
-
specifiedQueryTimestamp.map(schemaResolver.getTableInternalSchemaFromCommitMetadata)
- .getOrElse(schemaResolver.getTableInternalSchemaFromCommitMetadata)
- } match {
- case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt)
- case Failure(e) =>
- None
- }
- }
-
- val (name, namespace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
- val avroSchema = internalSchemaOpt.map { is =>
- AvroInternalSchemaConverter.convert(is, namespace + "." + name)
- } orElse {
- specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
- } orElse {
- schemaSpec.map(s => convertToAvroSchema(s, tableName))
- } getOrElse {
- Try(schemaResolver.getTableAvroSchema) match {
- case Success(schema) => schema
- case Failure(e) =>
- throw new HoodieSchemaException("Failed to fetch schema from the
table")
- }
- }
-
- (avroSchema, internalSchemaOpt)
- }
-
- protected lazy val tableStructSchema: StructType = {
- val converted =
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
- val metaFieldMetadata = sparkAdapter.createCatalystMetadataForMetaField
-
- // NOTE: Here we annotate meta-fields with corresponding metadata such
that Spark (>= 3.2)
- // is able to recognize such fields as meta-fields
- StructType(converted.map { field =>
- if (metaFieldNames.exists(metaFieldName => resolver(metaFieldName,
field.name))) {
- field.copy(metadata = metaFieldMetadata)
- } else {
- field
- }
- })
- }
-
- protected lazy val preCombineFieldOpt: Option[String] =
- Option(tableConfig.getPreCombineField)
- .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key))
match {
- // NOTE: This is required to compensate for cases when empty string is
used to stub
- // property value to avoid it being set with the default value
- // TODO(HUDI-3456) cleanup
- case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
- case _ => None
- }
-
- protected def timeline: HoodieTimeline =
- // NOTE: We're including compaction here since it's not considering a
"commit" operation
- metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants
-
- protected lazy val recordKeyField: String =
- if (tableConfig.populateMetaFields()) {
- HoodieRecord.RECORD_KEY_METADATA_FIELD
- } else {
- val keyFields = tableConfig.getRecordKeyFields.get()
- checkState(keyFields.length == 1)
- keyFields.head
- }
-
- private def queryTimestamp: Option[String] =
-
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp))
-
- protected lazy val specifiedQueryTimestamp: Option[String] =
- optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
- .map(HoodieSqlCommonUtils.formatQueryInstant)
-
- private def getConfigValue(config: ConfigProperty[String],
- defaultValueOption: Option[String] =
Option.empty): String = {
- optParams.getOrElse(config.key(),
- sqlContext.getConf(config.key(),
defaultValueOption.getOrElse(config.defaultValue())))
- }
-
- private def checkIfAConfigurationEnabled(config:
ConfigProperty[java.lang.Boolean],
- defaultValueOption: Option[String] =
Option.empty): Boolean = {
- optParams.getOrElse(config.key(),
- sqlContext.getConf(config.key(),
defaultValueOption.getOrElse(String.valueOf(config.defaultValue())))).toBoolean
- }
-
- protected val mergeType: String =
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
- DataSourceReadOptions.REALTIME_MERGE.defaultValue)
-
- protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
- // Controls whether partition columns (which are the source for the
partition path values) should
- // be omitted from persistence in the data files. On the read path it
affects whether partition values (values
- // of partition columns) will be read from the data file or extracted from
partition path
- val shouldOmitPartitionColumns =
metaClient.getTableConfig.shouldDropPartitionColumns &&
partitionColumns.nonEmpty
- val shouldExtractPartitionValueFromPath =
-
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
-
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
- val shouldUseBootstrapFastRead =
optParams.getOrElse(DATA_QUERIES_ONLY.key(), "false").toBoolean
-
- shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath ||
shouldUseBootstrapFastRead
- }
-
- protected lazy val mandatoryFieldsForMerging: Seq[String] =
- Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
-
- protected lazy val partitionColumns: Array[String] =
tableConfig.getPartitionFields.orElse(Array.empty)
-
- def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined
-
- def getHadoopFsRelation(isMOR: Boolean, isBootstrap: Boolean): BaseRelation
= {
- val fileIndex = HoodieFileIndex(
- sparkSession, metaClient, Some(tableStructSchema), optParams,
FileStatusCache.getOrCreate(sparkSession), isMOR, shouldEmbedFileSlices = true)
- val recordMergerImpls =
ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList
- val recordMergerStrategy =
getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY,
- Option(metaClient.getTableConfig.getRecordMergerStrategy))
-
- val fileGroupReaderEnabled =
checkIfAConfigurationEnabled(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
- val shouldUseRecordPosition =
checkIfAConfigurationEnabled(HoodieWriteConfig.WRITE_RECORD_POSITIONS)
-
- val tableState = // Subset of the state of table's configuration as of at
the time of the query
- HoodieTableState(
- tablePath = basePath.toString,
- latestCommitTimestamp = queryTimestamp,
- recordKeyField = recordKeyField,
- preCombineFieldOpt = preCombineFieldOpt,
- usesVirtualKeys = !tableConfig.populateMetaFields(),
- recordPayloadClassName = tableConfig.getPayloadClass,
- metadataConfig = fileIndex.metadataConfig,
- recordMergerImpls = recordMergerImpls,
- recordMergerStrategy = recordMergerStrategy
- )
-
- val mandatoryFields = if (isMOR) {
- mandatoryFieldsForMerging
- } else {
- Seq.empty
- }
-
- val fileFormat = if (fileGroupReaderEnabled) {
- new HoodieFileGroupReaderBasedParquetFileFormat(
- tableState, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
- isMOR, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
- } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled &&
!isBootstrap) {
- new
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
-
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
isMOR, false, Seq.empty)
- } else {
- new
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
- isMOR, isBootstrap, false, Seq.empty)
- }
-
- HadoopFsRelation(
- location = fileIndex,
- partitionSchema = fileIndex.partitionSchema,
- dataSchema = fileIndex.dataSchema,
- bucketSpec = None,
- fileFormat = fileFormat,
- optParams)(sparkSession)
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
deleted file mode 100644
index 0751d99f79c..00000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
+++ /dev/null
@@ -1,378 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import
org.apache.hudi.DataSourceReadOptions.{REALTIME_PAYLOAD_COMBINE_OPT_VAL,
REALTIME_SKIP_MERGE_OPT_VAL}
-import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
-import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{BaseFile, FileSlice, HoodieLogFile,
HoodieRecord}
-import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.{HoodieBaseRelation, HoodiePartitionFileSliceMapping,
HoodieSparkUtils, HoodieTableSchema, HoodieTableState, LogFileIterator,
MergeOnReadSnapshotRelation, RecordMergingFileIterator, SkipMergeIterator,
SparkAdapterSupport}
-import org.apache.spark.broadcast.Broadcast
-import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
-import org.apache.spark.util.SerializableConfiguration
-
-import scala.collection.mutable
-import scala.jdk.CollectionConverters.asScalaIteratorConverter
-
-/**
- * This class does bootstrap and MOR merging so that we can use hadoopfs
relation.
- */
-class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState],
- tableSchema: Broadcast[HoodieTableSchema],
- tableName: String,
- mergeType: String,
- mandatoryFields: Seq[String],
- isMOR: Boolean,
- isBootstrap: Boolean,
- isIncremental: Boolean,
- requiredFilters: Seq[Filter]
- ) extends ParquetFileFormat with
SparkAdapterSupport with HoodieFormatTrait {
-
- def getRequiredFilters: Seq[Filter] = requiredFilters
-
- override def isSplitable(sparkSession: SparkSession,
- options: Map[String, String],
- path: Path): Boolean = {
- false
- }
-
- /**
- * Support batch needs to remain consistent, even if one side of a bootstrap
merge can support
- * while the other side can't
- */
- private var supportBatchCalled = false
- private var supportBatchResult = false
- override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
- if (!supportBatchCalled || supportBatchResult) {
- supportBatchCalled = true
- supportBatchResult = !isIncremental && !isMOR &&
super.supportBatch(sparkSession, schema)
- }
- supportBatchResult
- }
-
- override def buildReaderWithPartitionValues(sparkSession: SparkSession,
- dataSchema: StructType,
- partitionSchema: StructType,
- requiredSchema: StructType,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
-
- val outputSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
-
- val requiredSchemaWithMandatory = if (!isMOR ||
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
- //add mandatory fields to required schema
- val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
- for (field <- mandatoryFields) {
- if (requiredSchema.getFieldIndex(field).isEmpty) {
- val fieldToAdd =
dataSchema.fields(dataSchema.getFieldIndex(field).get)
- added.append(fieldToAdd)
- }
- }
- val addedFields = StructType(added.toArray)
- StructType(requiredSchema.toArray ++ addedFields.fields)
- } else {
- dataSchema
- }
-
- val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
- val requiredMeta = StructType(requiredSchemaSplits._1)
- val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
- val needMetaCols = requiredMeta.nonEmpty
- val needDataCols = requiredWithoutMeta.nonEmpty
- // note: this is only the output of the bootstrap merge if isMOR. If it is
only bootstrap then the
- // output will just be outputSchema
- val bootstrapReaderOutput = StructType(requiredMeta.fields ++
requiredWithoutMeta.fields)
-
- val skeletonReaderAppend = needMetaCols && isBootstrap && !(needDataCols
|| isMOR) && partitionSchema.nonEmpty
- val bootstrapBaseAppend = needDataCols && isBootstrap && !isMOR &&
partitionSchema.nonEmpty
-
- val (baseFileReader, preMergeBaseFileReader, skeletonReader,
bootstrapBaseReader) = buildFileReaders(sparkSession,
- dataSchema, partitionSchema, if (isIncremental)
requiredSchemaWithMandatory else requiredSchema,
- filters, options, hadoopConf, requiredSchemaWithMandatory,
- requiredWithoutMeta, requiredMeta)
-
- val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
- (file: PartitionedFile) => {
- file.partitionValues match {
- case fileSliceMapping: HoodiePartitionFileSliceMapping =>
- val filePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
- if (FSUtils.isLogFile(filePath)) {
- //no base file
- val fileSlice =
fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
- val logFiles = getLogFilesFromSlice(fileSlice)
- val outputAvroSchema =
HoodieBaseRelation.convertToAvroSchema(outputSchema, tableName)
- new LogFileIterator(logFiles, filePath.getParent,
tableSchema.value, outputSchema, outputAvroSchema,
- tableState.value, broadcastedHadoopConf.value.value)
- } else {
- //We do not broadcast the slice if it has no log files or
bootstrap base
- fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName))
match {
- case Some(fileSlice) =>
- val hoodieBaseFile = fileSlice.getBaseFile.get()
- val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
- val partitionValues = fileSliceMapping.getPartitionValues
- val logFiles = if (isMOR) getLogFilesFromSlice(fileSlice) else
List.empty
- if (requiredSchemaWithMandatory.isEmpty) {
- val baseFile = createPartitionedFile(partitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
- baseFileReader(baseFile)
- } else if (bootstrapFileOpt.isPresent) {
- val bootstrapIterator =
buildBootstrapIterator(skeletonReader, bootstrapBaseReader,
- skeletonReaderAppend, bootstrapBaseAppend,
bootstrapFileOpt.get(), hoodieBaseFile, partitionValues,
- needMetaCols, needDataCols)
- (isMOR, logFiles.nonEmpty) match {
- case (true, true) =>
buildMergeOnReadIterator(bootstrapIterator, logFiles, filePath.getParent,
- bootstrapReaderOutput, requiredSchemaWithMandatory,
outputSchema, partitionSchema, partitionValues,
- broadcastedHadoopConf.value.value)
- case (true, false) =>
appendPartitionAndProject(bootstrapIterator, bootstrapReaderOutput,
- partitionSchema, outputSchema, partitionValues)
- case (false, false) => bootstrapIterator
- case (false, true) => throw new
IllegalStateException("should not be log files if not mor table")
- }
- } else {
- if (logFiles.nonEmpty) {
- val baseFile = createPartitionedFile(InternalRow.empty,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
- buildMergeOnReadIterator(preMergeBaseFileReader(baseFile),
logFiles, filePath.getParent, requiredSchemaWithMandatory,
- requiredSchemaWithMandatory, outputSchema,
partitionSchema, partitionValues, broadcastedHadoopConf.value.value)
- } else {
- throw new IllegalStateException("should not be here since
file slice should not have been broadcasted since it has no log or data files")
- //baseFileReader(baseFile)
- }
- }
- case _ if isIncremental =>
- projectSchema(baseFileReader(file),
StructType(requiredSchemaWithMandatory.fields ++ partitionSchema.fields),
outputSchema)
- case _ => baseFileReader(file)
- }
- }
- case _ if isIncremental =>
- projectSchema(baseFileReader(file),
StructType(requiredSchemaWithMandatory.fields ++ partitionSchema.fields),
outputSchema)
- case _ => baseFileReader(file)
- }
- }
- }
-
- /**
- * Build file readers to read individual physical files
- */
- protected def buildFileReaders(sparkSession: SparkSession, dataSchema:
StructType, partitionSchema: StructType,
- requiredSchema: StructType, filters: Seq[Filter],
options: Map[String, String],
- hadoopConf: Configuration, requiredSchemaWithMandatory:
StructType,
- requiredWithoutMeta: StructType, requiredMeta:
StructType):
- (PartitionedFile => Iterator[InternalRow],
- PartitionedFile => Iterator[InternalRow],
- PartitionedFile => Iterator[InternalRow],
- PartitionedFile => Iterator[InternalRow]) = {
-
- //file reader when you just read a hudi parquet file and don't do any
merging
- val baseFileReader = if (isIncremental) {
- super.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema, requiredSchemaWithMandatory,
- filters ++ requiredFilters, options, new Configuration(hadoopConf))
- } else {
- super.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema, requiredSchema,
- filters ++ requiredFilters, options, new Configuration(hadoopConf))
- }
-
- //file reader for reading a hudi base file that needs to be merged with
log files
- val recordKeyRelatedFilters = getRecordKeyRelatedFilters(filters,
tableState.value.recordKeyField)
- val preMergeBaseFileReader = if (isMOR) {
- super.buildReaderWithPartitionValues(sparkSession, dataSchema,
StructType(Seq.empty),
- requiredSchemaWithMandatory, requiredFilters ++
recordKeyRelatedFilters, options, new Configuration(hadoopConf))
- } else {
- _: PartitionedFile => Iterator.empty
- }
-
- //Rules for appending partitions and filtering in the bootstrap readers:
- // 1. if it is mor, we don't want to filter data or append partitions
- // 2. if we need to merge the bootstrap base and skeleton files then we
cannot filter
- // 3. if we need to merge the bootstrap base and skeleton files then we
should never append partitions to the
- // skeleton reader
-
- val needMetaCols = requiredMeta.nonEmpty
- val needDataCols = requiredWithoutMeta.nonEmpty
-
- //file reader for bootstrap skeleton files
- val skeletonReader = if (needMetaCols && isBootstrap) {
- if (needDataCols || isMOR) {
- // no filter and no append
- super.buildReaderWithPartitionValues(sparkSession,
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
- requiredMeta, Seq.empty, options, new Configuration(hadoopConf))
- } else {
- // filter and append
- super.buildReaderWithPartitionValues(sparkSession,
HoodieSparkUtils.getMetaSchema, partitionSchema,
- requiredMeta, filters, options, new Configuration(hadoopConf))
- }
- } else {
- _: PartitionedFile => Iterator.empty
- }
-
- //file reader for bootstrap base files
- val bootstrapBaseReader = if (needDataCols && isBootstrap) {
- val dataSchemaWithoutMeta = StructType(dataSchema.fields.filterNot(sf =>
isMetaField(sf.name)))
- if (isMOR) {
- // no filter and no append
- super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
- Seq.empty, options, new Configuration(hadoopConf))
- } else if (needMetaCols) {
- // no filter but append
- super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
- Seq.empty, options, new Configuration(hadoopConf))
- } else {
- // filter and append
- super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
- filters ++ requiredFilters, options, new Configuration(hadoopConf))
- }
- } else {
- _: PartitionedFile => Iterator.empty
- }
-
- (baseFileReader, preMergeBaseFileReader, skeletonReader,
bootstrapBaseReader)
- }
-
- /**
- * Create iterator for a file slice that has bootstrap base and skeleton file
- */
- protected def buildBootstrapIterator(skeletonReader: PartitionedFile =>
Iterator[InternalRow],
- bootstrapBaseReader: PartitionedFile =>
Iterator[InternalRow],
- skeletonReaderAppend: Boolean,
bootstrapBaseAppend: Boolean,
- bootstrapBaseFile: BaseFile, hoodieBaseFile:
BaseFile,
- partitionValues: InternalRow, needMetaCols:
Boolean,
- needDataCols: Boolean): Iterator[InternalRow] = {
- lazy val skeletonFile = if (skeletonReaderAppend) {
- createPartitionedFile(partitionValues, hoodieBaseFile.getHadoopPath, 0,
hoodieBaseFile.getFileLen)
- } else {
- createPartitionedFile(InternalRow.empty, hoodieBaseFile.getHadoopPath,
0, hoodieBaseFile.getFileLen)
- }
-
- lazy val dataFile = if (bootstrapBaseAppend) {
- createPartitionedFile(partitionValues, bootstrapBaseFile.getHadoopPath,
0, bootstrapBaseFile.getFileLen)
- } else {
- createPartitionedFile(InternalRow.empty,
bootstrapBaseFile.getHadoopPath, 0, bootstrapBaseFile.getFileLen)
- }
-
- lazy val skeletonIterator = skeletonReader(skeletonFile)
- lazy val dataFileIterator = bootstrapBaseReader(dataFile)
-
- (needMetaCols, needDataCols) match {
- case (true, true) => doBootstrapMerge(skeletonIterator, dataFileIterator)
- case (true, false) => skeletonIterator
- case (false, true) => dataFileIterator
- case (false, false) => throw new IllegalStateException("should not be
here if only partition cols are required")
- }
- }
-
- /**
- * Merge skeleton and data file iterators
- */
- protected def doBootstrapMerge(skeletonFileIterator: Iterator[Any],
dataFileIterator: Iterator[Any]): Iterator[InternalRow] = {
- new Iterator[Any] {
- val combinedRow = new JoinedRow()
-
- override def hasNext: Boolean = {
- checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
- "Bootstrap data-file iterator and skeleton-file iterator have to be
in-sync!")
- dataFileIterator.hasNext && skeletonFileIterator.hasNext
- }
-
- override def next(): Any = {
- (skeletonFileIterator.next(), dataFileIterator.next()) match {
- case (s: ColumnarBatch, d: ColumnarBatch) =>
- val numCols = s.numCols() + d.numCols()
- val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
- for (i <- 0 until numCols) {
- if (i < s.numCols()) {
- vecs(i) = s.column(i)
- } else {
- vecs(i) = d.column(i - s.numCols())
- }
- }
- assert(s.numRows() == d.numRows())
- sparkAdapter.makeColumnarBatch(vecs, s.numRows())
- case (_: ColumnarBatch, _: InternalRow) => throw new
IllegalStateException("InternalRow ColumnVector mismatch")
- case (_: InternalRow, _: ColumnarBatch) => throw new
IllegalStateException("InternalRow ColumnVector mismatch")
- case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
- }
- }
- }.asInstanceOf[Iterator[InternalRow]]
- }
-
- /**
- * Create iterator for a file slice that has log files
- */
- protected def buildMergeOnReadIterator(iter: Iterator[InternalRow],
logFiles: List[HoodieLogFile],
- partitionPath: Path, inputSchema: StructType,
requiredSchemaWithMandatory: StructType,
- outputSchema: StructType, partitionSchema:
StructType, partitionValues: InternalRow,
- hadoopConf: Configuration):
Iterator[InternalRow] = {
-
- val requiredAvroSchema =
HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, tableName)
- val morIterator = mergeType match {
- case REALTIME_SKIP_MERGE_OPT_VAL => throw new
UnsupportedOperationException("Skip merge is not currently " +
- "implemented for the New Hudi Parquet File format")
- //new SkipMergeIterator(logFiles, partitionPath, iter, inputSchema,
tableSchema.value,
- // requiredSchemaWithMandatory, requiredAvroSchema, tableState.value,
hadoopConf)
- case REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
- new RecordMergingFileIterator(logFiles, partitionPath, iter,
inputSchema, tableSchema.value,
- requiredSchemaWithMandatory, requiredAvroSchema, tableState.value,
hadoopConf)
- }
- appendPartitionAndProject(morIterator, requiredSchemaWithMandatory,
partitionSchema,
- outputSchema, partitionValues)
- }
-
- /**
- * Append partition values to rows and project to output schema
- */
- protected def appendPartitionAndProject(iter: Iterator[InternalRow],
- inputSchema: StructType,
- partitionSchema: StructType,
- to: StructType,
- partitionValues: InternalRow):
Iterator[InternalRow] = {
- if (partitionSchema.isEmpty) {
- projectSchema(iter, inputSchema, to)
- } else {
- val unsafeProjection =
generateUnsafeProjection(StructType(inputSchema.fields ++
partitionSchema.fields), to)
- val joinedRow = new JoinedRow()
- iter.map(d => unsafeProjection(joinedRow(d, partitionValues)))
- }
- }
-
- protected def projectSchema(iter: Iterator[InternalRow],
- from: StructType,
- to: StructType): Iterator[InternalRow] = {
- val unsafeProjection = generateUnsafeProjection(from, to)
- iter.map(d => unsafeProjection(d))
- }
-
- protected def getLogFilesFromSlice(fileSlice: FileSlice):
List[HoodieLogFile] = {
-
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
- }
-
- protected def getRecordKeyRelatedFilters(filters: Seq[Filter],
recordKeyColumn: String): Seq[Filter] = {
- filters.filter(f => f.references.exists(c =>
c.equalsIgnoreCase(recordKeyColumn)))
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
index 9847918adc1..2eb38eedaeb 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
@@ -240,9 +240,6 @@ public class TestHoodieMergeHandleWithSparkMerger extends
SparkClientFunctionalT
properties.put(
FILE_GROUP_READER_ENABLED.key(),
"true");
- properties.put(
- DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(),
- "true");
properties.put(
WRITE_RECORD_POSITIONS.key(),
"true");
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
index 301b651ea69..d926a3be5a4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
@@ -20,11 +20,15 @@ package org.apache.hudi.functional;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import java.util.Map;
import java.util.stream.Stream;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
@@ -60,7 +64,6 @@ public class TestBootstrapRead extends TestBootstrapReadBase {
@ParameterizedTest
@MethodSource("testArgs")
public void testBootstrapFunctional(String bootstrapType, Boolean
dashPartitions, HoodieTableType tableType, Integer nPartitions) {
- /*
this.bootstrapType = bootstrapType;
this.dashPartitions = dashPartitions;
this.tableType = tableType;
@@ -86,6 +89,5 @@ public class TestBootstrapRead extends TestBootstrapReadBase {
doInsert(options, "002");
compareTables();
verifyMetaColOnlyRead(2);
- */
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
index d3246f7c4da..d4db2e3eb2f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
@@ -49,11 +49,8 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static
org.apache.hudi.DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT;
-import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
import static
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
-import static org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("functional")
@@ -185,21 +182,12 @@ public abstract class TestBootstrapReadBase extends
HoodieSparkClientTestBase {
protected void compareTables() {
Dataset<Row> hudiDf =
sparkSession.read().format("hudi").load(hudiBasePath);
Dataset<Row> bootstrapDf =
sparkSession.read().format("hudi").load(bootstrapTargetPath);
- Dataset<Row> fastBootstrapDf =
sparkSession.read().format("hudi").option(DATA_QUERIES_ONLY.key(),
"true").load(bootstrapTargetPath);
- boolean shouldTestFastBootstrap = tableType.equals(COPY_ON_WRITE) &&
!Boolean.parseBoolean(USE_NEW_HUDI_PARQUET_FILE_FORMAT().defaultValue());
if (nPartitions == 0) {
compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns));
- if (shouldTestFastBootstrap) {
- compareDf(fastBootstrapDf.drop("city_to_state"),
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path"));
- }
return;
}
compareDf(hudiDf.drop(dropColumns).drop(partitionCols),
bootstrapDf.drop(dropColumns).drop(partitionCols));
compareDf(hudiDf.select("_row_key",partitionCols),
bootstrapDf.select("_row_key",partitionCols));
- if (shouldTestFastBootstrap) {
- compareDf(fastBootstrapDf.drop("city_to_state").drop(partitionCols),
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path").drop(partitionCols));
- compareDf(fastBootstrapDf.select("_row_key",partitionCols),
bootstrapDf.select("_row_key",partitionCols));
- }
}
protected void verifyMetaColOnlyRead(Integer iteration) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
index efc803666e5..ce462c93d1b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
@@ -18,7 +18,6 @@
package org.apache.hudi.functional;
-import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.model.HoodieTableType;
@@ -104,11 +103,9 @@ public class TestNewHoodieParquetFileFormat extends
TestBootstrapReadBase {
protected void testCount(String tableBasePath) {
Dataset<Row> legacyDf = sparkSession.read().format("hudi")
-
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "false")
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
.load(tableBasePath);
Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
-
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "true")
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true")
.load(tableBasePath);
assertEquals(legacyDf.count(), fileFormatDf.count());
@@ -124,11 +121,9 @@ public class TestNewHoodieParquetFileFormat extends
TestBootstrapReadBase {
protected void runIndividualComparison(String tableBasePath, String
firstColumn, String... columns) {
Dataset<Row> legacyDf = sparkSession.read().format("hudi")
-
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "false")
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
.load(tableBasePath);
Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
-
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "true")
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true")
.load(tableBasePath);
if (firstColumn.isEmpty()) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
index 6bf7b7b28bb..6e22db914d3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -149,7 +149,6 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
// NOTE: type promotion is not supported for the custom file
format and the filegroup reader
// HUDI-7045 and PR#10007 in progress to fix the issue
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
"false")
-
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(), "false")
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -392,7 +391,6 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
// NOTE: long to int type change is not supported for the custom file
format and the filegroup reader
// HUDI-7045 and PR#10007 in progress to fix the issue
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
- .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(),
"false")
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -489,7 +487,6 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
// NOTE: type promotion is not supported for the custom file format and
the filegroup reader
// HUDI-7045 and PR#10007 in progress to fix the issue
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
- .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(),
"false")
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -556,7 +553,6 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
// NOTE: type promotion is not supported for the custom file format and
the filegroup reader
// HUDI-7045 and PR#10007 in progress to fix the issue
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
- .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(),
"false")
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
@@ -827,7 +823,6 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
// NOTE: type promotion is not supported for the custom file format and
the filegroup reader
// HUDI-7045 and PR#10007 in progress to fix the issue
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
- .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(),
"false")
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 5c1d04ee477..78b880be386 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -21,7 +21,6 @@ package org.apache.hudi.common.table.read
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
-import org.apache.hudi.DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT
import
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
@@ -111,7 +110,6 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
schema: Schema,
fileGroupId: String): Unit = {
val expectedDf = spark.read.format("hudi")
- .option(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(), "false")
.option(FILE_GROUP_READER_ENABLED.key(), "false")
.load(basePath)
.where(col(HoodieRecord.FILENAME_METADATA_FIELD).contains(fileGroupId))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index acf406c9fbc..c82d9ebe635 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -71,7 +71,6 @@ class TestMORDataSourceStorage extends
SparkClientFunctionalTestHarness {
options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() ->
preCombineField)
}
if (useFileGroupReader) {
- options += (DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key()
-> String.valueOf(useFileGroupReader))
options += (HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() ->
String.valueOf(useFileGroupReader))
}
val dataGen = new HoodieTestDataGenerator(0xDEEF)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
index 1e605b092bf..40871997b28 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
@@ -27,7 +27,7 @@ import org.apache.hudi.common.util
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.util.JFunction
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
QuickstartUtils}
+import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{lit, typedLit}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
@@ -123,7 +123,6 @@ class TestPartialUpdateAvroPayload extends
HoodieClientTestBase {
.save(basePath)
val finalDF = spark.read.format("hudi")
- .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(),
String.valueOf(useFileGroupReader))
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
String.valueOf(useFileGroupReader))
.load(basePath)
assertEquals(finalDF.select("rider").collectAsList().get(0).getString(0),
upsert1DF.select("rider").collectAsList().get(0).getString(0))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
index 25aa955b811..0c1ca31479d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.hudi
import org.apache.avro.Schema
-import org.apache.hudi.DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT
import org.apache.hudi.DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.avro.HoodieAvroUtils
@@ -75,7 +74,6 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
- spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
// Create a table with five data fields
spark.sql(
@@ -124,7 +122,6 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
- spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
// Write inserts to log block
spark.sql(s"set ${INDEX_TYPE.key} = INMEMORY")
@@ -183,7 +180,6 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
spark.sql(s"set ${LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat")
spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
- spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
// Create a table with five data fields
spark.sql(
@@ -287,7 +283,6 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
spark.sql(s"set ${LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat")
spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
- spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
// Create a table with five data fields
spark.sql(
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index f87e5c231bf..2b52bda04f2 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -62,9 +62,7 @@ public class HoodieIncrSource extends RowSource {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieIncrSource.class);
public static final Set<String> HOODIE_INCR_SOURCE_READ_OPT_KEYS =
- CollectionUtils.createImmutableSet(
- "hoodie.datasource.read.use.new.parquet.file.format",
- HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key());
+
CollectionUtils.createImmutableSet(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key());
private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
public static class Config {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index 42c485ea2e2..6b364667054 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -20,7 +20,6 @@
package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
@@ -96,7 +95,6 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStrea
protected Map<String, String> readOpts = new HashMap<String, String>() {
{
- put(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(),
"false");
put(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false");
}
};
@@ -146,7 +144,6 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStrea
protected HoodieDeltaStreamer.Config getDeltaStreamerConfig(String[]
transformerClasses, boolean nullForDeletedCols,
TypedProperties
extraProps) throws IOException {
-
extraProps.setProperty(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(),
"false");
extraProps.setProperty(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
"false");
extraProps.setProperty("hoodie.datasource.write.table.type", tableType);
extraProps.setProperty("hoodie.datasource.write.row.writer.enable",
rowWriterEnable.toString());