This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0c0c402a9a03 refactor: Use HoodieFileGroupReader paths for all Spark
Datasource reads (#17457)
0c0c402a9a03 is described below
commit 0c0c402a9a03061155b5c43f1f4cdd37726eacfd
Author: Tim Brown <[email protected]>
AuthorDate: Sun Dec 7 17:05:51 2025 -0500
refactor: Use HoodieFileGroupReader paths for all Spark Datasource reads
(#17457)
---
.../main/scala/org/apache/hudi/DefaultSource.scala | 34 +-
.../org/apache/hudi/HoodieBootstrapMORRDD.scala | 88 ---
.../apache/hudi/HoodieBootstrapMORRelation.scala | 101 ----
.../scala/org/apache/hudi/HoodieBootstrapRDD.scala | 130 -----
.../org/apache/hudi/HoodieBootstrapRelation.scala | 266 ---------
.../hudi/HoodieHadoopFsRelationFactory.scala | 5 +-
.../org/apache/hudi/HoodieMergeOnReadRDDV1.scala | 166 ------
.../org/apache/hudi/HoodieMergeOnReadRDDV2.scala | 12 +-
.../src/main/scala/org/apache/hudi/Iterators.scala | 405 --------------
.../hudi/MergeOnReadIncrementalRelationV1.scala | 10 +-
.../org/apache/hudi/cdc/CDCFileGroupIterator.scala | 26 +-
.../scala/org/apache/hudi/cdc/CDCRelation.scala | 217 --------
.../apache/hudi/{ => cdc}/HoodieCDCFileIndex.scala | 62 ++-
.../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala | 602 ---------------------
.../sql/FileFormatUtilsForFileGroupReader.scala | 7 +-
.../HoodieFileGroupReaderBasedFileFormat.scala | 4 +-
.../sql/hudi/streaming/HoodieStreamSourceV1.scala | 27 +-
.../sql/hudi/streaming/HoodieStreamSourceV2.scala | 27 +-
18 files changed, 134 insertions(+), 2055 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 1520cd1bd34d..363aa116f7fa 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -19,23 +19,21 @@ package org.apache.hudi
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL,
OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
-import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.cdc.HoodieCDCFileIndex
import org.apache.hudi.common.HoodieSchemaNotFoundException
-import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
import org.apache.hudi.common.model.WriteConcurrencyMode
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
HoodieTableVersion, TableSchemaResolver}
-import org.apache.hudi.common.table.log.InstantRange.RangeType
import org.apache.hudi.common.util.{ConfigUtils, TablePathUtils}
import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieWriteConfig.{WRITE_CONCURRENCY_MODE,
WRITE_TABLE_VERSION}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.io.storage.HoodieSparkIOFactory
import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
-import org.apache.hudi.util.{SparkConfigUtils}
+import org.apache.hudi.util.SparkConfigUtils
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
@@ -304,16 +302,12 @@ object DefaultSource {
if
(metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) {
new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters,
Some(schema)))
} else if (isCdcQuery) {
- if (isNotMetadataTable) {
- if (tableType == COPY_ON_WRITE) {
- new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
- sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
- } else {
- new HoodieMergeOnReadCDCHadoopFsRelationFactory(
- sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
- }
+ if (tableType == COPY_ON_WRITE) {
+ new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
} else {
- CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
+ new HoodieMergeOnReadCDCHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
}
} else {
@@ -345,12 +339,8 @@ object DefaultSource {
}
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
- if (isNotMetadataTable) {
- new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
- sqlContext, metaClient, parameters, userSchema, isBootstrap =
true).build()
- } else {
- HoodieBootstrapMORRelation(sqlContext, userSchema, metaClient,
parameters)
- }
+ new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters, userSchema, isBootstrap =
true).build()
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
if (hoodieTableSupportsCompletionTime) {
@@ -394,11 +384,11 @@ object DefaultSource {
private def resolveSchema(metaClient: HoodieTableMetaClient,
parameters: Map[String, String],
schema: Option[StructType]): StructType = {
- val isCdcQuery = CDCRelation.isCDCEnabled(metaClient) &&
+ val isCdcQuery = HoodieCDCFileIndex.isCDCEnabled(metaClient) &&
parameters.get(QUERY_TYPE.key).contains(QUERY_TYPE_INCREMENTAL_OPT_VAL)
&&
parameters.get(INCREMENTAL_FORMAT.key).contains(INCREMENTAL_FORMAT_CDC_VAL)
if (isCdcQuery) {
- CDCRelation.FULL_CDC_SPARK_SCHEMA
+ HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA
} else {
val schemaResolver = new TableSchemaResolver(metaClient)
try {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRDD.scala
deleted file mode 100644
index f298ca849107..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRDD.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.HoodieBaseRelation.BaseFileReader
-import org.apache.hudi.HoodieBootstrapMORRDD.{getPartitionPath,
CONFIG_INSTANTIATION_LOCK}
-import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
-import org.apache.hudi.storage.StoragePath
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.spark.{Partition, SerializableWritable, TaskContext}
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-
-class HoodieBootstrapMORRDD(@transient spark: SparkSession,
- @transient config: Configuration,
- bootstrapDataFileReader: BaseFileReader,
- bootstrapSkeletonFileReader: BaseFileReader,
- regularFileReader: BaseFileReader,
- tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- tableState: HoodieTableState,
- @transient splits: Seq[BaseHoodieBootstrapSplit])
- extends HoodieBootstrapRDD(spark, bootstrapDataFileReader,
bootstrapSkeletonFileReader,
- regularFileReader, requiredSchema, splits) {
-
- protected val maxCompactionMemoryInBytes: Long =
getMaxCompactionMemoryInBytes(new JobConf(config))
-
- private val hadoopConfBroadcast = spark.sparkContext.broadcast(new
SerializableWritable(config))
-
- override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
- val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition]
- maybeLog(bootstrapPartition)
- val bootstrapMORSplit =
bootstrapPartition.split.asInstanceOf[HoodieBootstrapMORSplit]
-
- if (bootstrapMORSplit.logFiles.isEmpty) {
- //no log files, treat like regular bootstrap
- getIterator(bootstrapPartition)
- } else {
- bootstrapMORSplit.skeletonFile match {
- case Some(skeletonFile) =>
- val (iterator, schema) =
getSkeletonIteratorSchema(bootstrapMORSplit.dataFile, skeletonFile)
- new RecordMergingFileIterator(bootstrapMORSplit.logFiles,
getPartitionPath(skeletonFile),
- iterator, schema, tableSchema, requiredSchema, tableState,
getHadoopConf)
- case _ =>
- // NOTE: Regular file-reader is already projected into the required
schema
- new RecordMergingFileIterator(bootstrapMORSplit.logFiles,
- getPartitionPath(bootstrapMORSplit.dataFile),
- regularFileReader.read(bootstrapMORSplit.dataFile),
regularFileReader.schema, tableSchema,
- requiredSchema, tableState, getHadoopConf)
- }
- }
- }
-
- private def getHadoopConf: Configuration = {
- val conf = hadoopConfBroadcast.value.value
- // TODO clean up, this lock is unnecessary see HoodieMergeOnReadRDD
- CONFIG_INSTANTIATION_LOCK.synchronized {
- new Configuration(conf)
- }
- }
-}
-
-object HoodieBootstrapMORRDD extends SparkAdapterSupport {
- val CONFIG_INSTANTIATION_LOCK = new Object()
-
- def getPartitionPath(file: PartitionedFile): StoragePath = {
-
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file).getParent
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala
deleted file mode 100644
index 60ae8b722478..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
-import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.storage.StoragePath
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
-
-import scala.collection.JavaConverters._
-
-case class HoodieBootstrapMORSplit(dataFile: PartitionedFile, skeletonFile:
Option[PartitionedFile],
- logFiles: List[HoodieLogFile]) extends
BaseHoodieBootstrapSplit
-
-/**
- * This is Spark relation that can be used for querying metadata/fully
bootstrapped query hoodie tables, as well as
- * non-bootstrapped tables. It implements PrunedFilteredScan interface in
order to support column pruning and filter
- * push-down. For metadata bootstrapped files, if we query columns from both
metadata and actual data then it will
- * perform a merge of both to return the result.
- *
- * Caveat: Filter push-down does not work when querying both metadata and
actual data columns over metadata
- * bootstrapped files, because then the metadata file and data file can return
different number of rows causing errors
- * merging.
- *
- * @param sqlContext Spark SQL Context
- * @param userSchema User specified schema in the datasource query
- * @param globPaths The global paths to query. If it not none, read from the
globPaths,
- * else read data from tablePath using HoodieFileIndex.
- * @param metaClient Hoodie table meta client
- * @param optParams DataSource options passed by the user
- */
-case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext,
- private val userSchema:
Option[StructType],
- override val metaClient:
HoodieTableMetaClient,
- override val optParams: Map[String,
String],
- private val prunedDataSchema:
Option[StructType] = None)
- extends BaseHoodieBootstrapRelation(sqlContext, userSchema, metaClient,
- optParams, prunedDataSchema) {
-
- override type Relation = HoodieBootstrapMORRelation
-
- protected lazy val mandatoryFieldsForMerging: Seq[String] =
- Seq(recordKeyField) ++ orderingFields
-
- override lazy val mandatoryFields: Seq[String] = mandatoryFieldsForMerging
-
- protected override def getFileSlices(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[FileSlice] = {
- fileIndex.listFileSlices(HoodieFileIndex.
- convertFilterForTimestampKeyGenerator(metaClient,
partitionFilters)).values.flatten.toSeq
- }
-
- protected override def createFileSplit(fileSlice: FileSlice, dataFile:
PartitionedFile, skeletonFile: Option[PartitionedFile]): FileSplit = {
- HoodieBootstrapMORSplit(dataFile, skeletonFile,
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList)
- }
-
- protected override def composeRDD(fileSplits: Seq[FileSplit],
- tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- requestedColumns: Array[String],
- filters: Array[Filter]): RDD[InternalRow]
= {
-
- val (bootstrapDataFileReader, bootstrapSkeletonFileReader,
regularFileReader) = getFileReaders(tableSchema,
- requiredSchema, requestedColumns, filters)
- new HoodieBootstrapMORRDD(
- sqlContext.sparkSession,
- config = jobConf,
- bootstrapDataFileReader = bootstrapDataFileReader,
- bootstrapSkeletonFileReader = bootstrapSkeletonFileReader,
- regularFileReader = regularFileReader,
- tableSchema = tableSchema,
- requiredSchema = requiredSchema,
- tableState = tableState, fileSplits)
- }
-
-
- override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
- this.copy(prunedDataSchema = Some(prunedSchema))
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
deleted file mode 100644
index fcac20bf9b97..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.HoodieBaseRelation.BaseFileReader
-import org.apache.hudi.common.util.ValidationUtils.checkState
-
-import org.apache.spark.{Partition, TaskContext}
-import org.apache.spark.rdd.RDD
-import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.types.StructType
-
-class HoodieBootstrapRDD(@transient spark: SparkSession,
- bootstrapDataFileReader: BaseFileReader,
- bootstrapSkeletonFileReader: BaseFileReader,
- regularFileReader: BaseFileReader,
- requiredSchema: HoodieTableSchema,
- @transient splits: Seq[BaseHoodieBootstrapSplit])
- extends RDD[InternalRow](spark.sparkContext, Nil) {
-
-
- protected def getSkeletonIteratorSchema(dataFile: PartitionedFile,
skeletonFile: PartitionedFile): (Iterator[InternalRow], StructType) = {
- if (bootstrapDataFileReader.schema.isEmpty) {
- // No data column to fetch, hence fetch only from skeleton file
- (bootstrapSkeletonFileReader.read(skeletonFile),
bootstrapSkeletonFileReader.schema)
- } else if (bootstrapSkeletonFileReader.schema.isEmpty) {
- // No metadata column to fetch, hence fetch only from data file
- (bootstrapDataFileReader.read(dataFile), bootstrapDataFileReader.schema)
- } else {
- // Fetch from both data and skeleton file, and merge
- val dataFileIterator = bootstrapDataFileReader.read(dataFile)
- val skeletonFileIterator = bootstrapSkeletonFileReader.read(skeletonFile)
- val mergedSchema = StructType(bootstrapSkeletonFileReader.schema.fields
++ bootstrapDataFileReader.schema.fields)
-
- (merge(skeletonFileIterator, dataFileIterator), mergedSchema)
- }
- }
-
- /**
- * Here we have to project the [[InternalRow]]s fetched into the expected
target schema.
- * These could diverge for ex, when requested schema contains partition
columns which might not be
- * persisted w/in the data file, but instead would be parsed from the
partition path. In that case
- * output of the file-reader will have different ordering of the fields
than the original required
- * schema (for more details please check out [[ParquetFileFormat]]
implementation).
- */
- protected def unsafeProjectIterator(iterator: Iterator[InternalRow], schema:
StructType): Iterator[InternalRow] = {
- val unsafeProjection = generateUnsafeProjection(schema,
requiredSchema.structTypeSchema)
- iterator.map(unsafeProjection)
- }
-
- protected def maybeLog(bootstrapPartition: HoodieBootstrapPartition): Unit =
{
- if (log.isDebugEnabled) {
- var msg = "Got Split => Index: " + bootstrapPartition.index + ", Data
File: " +
- bootstrapPartition.split.dataFile.filePath
- if (bootstrapPartition.split.skeletonFile.isDefined) {
- msg += ", Skeleton File: " +
bootstrapPartition.split.skeletonFile.get.filePath
- }
- logDebug(msg)
- }
- }
-
- protected def getIterator(bootstrapPartition: HoodieBootstrapPartition):
Iterator[InternalRow] = {
- bootstrapPartition.split.skeletonFile match {
- case Some(skeletonFile) =>
- // It is a bootstrap split. Check both skeleton and data files.
- val (iterator, schema) =
getSkeletonIteratorSchema(bootstrapPartition.split.dataFile, skeletonFile)
- unsafeProjectIterator(iterator, schema)
- case _ =>
- // NOTE: Regular file-reader is already projected into the required
schema
- regularFileReader.read(bootstrapPartition.split.dataFile)
- }
- }
-
- override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
- val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition]
- maybeLog(bootstrapPartition)
- getIterator(bootstrapPartition)
- }
-
- def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator:
Iterator[InternalRow]): Iterator[InternalRow] = {
- new Iterator[InternalRow] {
- private val combinedRow = new JoinedRow()
-
- override def hasNext: Boolean = {
- checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
- "Bootstrap data-file iterator and skeleton-file iterator have to be
in-sync!")
- dataFileIterator.hasNext && skeletonFileIterator.hasNext
- }
-
- override def next(): InternalRow = {
- combinedRow(skeletonFileIterator.next(), dataFileIterator.next())
- }
- }
- }
-
- override protected def getPartitions: Array[Partition] = {
- splits.zipWithIndex.map(file => {
- if (file._1.skeletonFile.isDefined) {
- logDebug("Forming partition with => Index: " + file._2 + ", Files: " +
file._1.dataFile.filePath
- + "," + file._1.skeletonFile.get.filePath)
- HoodieBootstrapPartition(file._2, file._1)
- } else {
- logDebug("Forming partition with => Index: " + file._2 + ", File: " +
file._1.dataFile.filePath)
- HoodieBootstrapPartition(file._2, file._1)
- }
- }).toArray
- }
-}
-
-case class HoodieBootstrapPartition(index: Int, split:
BaseHoodieBootstrapSplit) extends Partition
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
deleted file mode 100644
index 99d43508d7f2..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, projectReader,
BaseFileReader}
-import org.apache.hudi.HoodieBootstrapRelation.{createPartitionedFile,
validate}
-import org.apache.hudi.common.model.FileSlice
-import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.storage.StoragePath
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
PartitionedFile}
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
-
-trait BaseHoodieBootstrapSplit extends HoodieFileSplit {
- val dataFile: PartitionedFile
- val skeletonFile: Option[PartitionedFile]
-}
-
-case class HoodieBootstrapSplit(dataFile: PartitionedFile,
- skeletonFile: Option[PartitionedFile]) extends
BaseHoodieBootstrapSplit
-
-case class HoodieBootstrapRelation(override val sqlContext: SQLContext,
- private val userSchema: Option[StructType],
- override val metaClient:
HoodieTableMetaClient,
- override val optParams: Map[String, String],
- private val prunedDataSchema:
Option[StructType] = None)
- extends BaseHoodieBootstrapRelation(sqlContext, userSchema, metaClient,
optParams, prunedDataSchema) {
-
- override type Relation = HoodieBootstrapRelation
-
- override protected def createFileSplit(fileSlice: FileSlice, dataFile:
PartitionedFile, skeletonFile: Option[PartitionedFile]): FileSplit = {
- HoodieBootstrapSplit(dataFile, skeletonFile)
- }
-
- override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
- this.copy(prunedDataSchema = Some(prunedSchema))
-
- def toHadoopFsRelation: HadoopFsRelation = {
- HadoopFsRelation(
- location = fileIndex,
- partitionSchema = fileIndex.partitionSchema,
- dataSchema = fileIndex.dataSchema,
- bucketSpec = None,
- fileFormat = fileFormat,
- optParams)(sparkSession)
- }
-}
-/**
- * This is Spark relation that can be used for querying metadata/fully
bootstrapped query hoodie tables, as well as
- * non-bootstrapped tables. It implements PrunedFilteredScan interface in
order to support column pruning and filter
- * push-down. For metadata bootstrapped files, if we query columns from both
metadata and actual data then it will
- * perform a merge of both to return the result.
- *
- * Caveat: Filter push-down does not work when querying both metadata and
actual data columns over metadata
- * bootstrapped files, because then the metadata file and data file can return
different number of rows causing errors
- * merging.
- *
- * @param sqlContext Spark SQL Context
- * @param userSchema User specified schema in the datasource query
- * @param globPaths The global paths to query. If it not none, read from the
globPaths,
- * else read data from tablePath using HoodieFileIndex.
- * @param metaClient Hoodie table meta client
- * @param optParams DataSource options passed by the user
- */
-abstract class BaseHoodieBootstrapRelation(override val sqlContext: SQLContext,
- private val userSchema:
Option[StructType],
- override val metaClient:
HoodieTableMetaClient,
- override val optParams: Map[String,
String],
- private val prunedDataSchema:
Option[StructType] = None)
- extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema,
prunedDataSchema) {
-
- override type FileSplit = BaseHoodieBootstrapSplit
-
- private lazy val skeletonSchema = HoodieSparkUtils.getMetaSchema
-
- override lazy val mandatoryFields: Seq[String] = Seq.empty
-
- protected def getFileSlices(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[FileSlice] = {
- listLatestFileSlices(partitionFilters, dataFilters)
- }
-
- protected def createFileSplit(fileSlice: FileSlice, dataFile:
PartitionedFile, skeletonFile: Option[PartitionedFile]): FileSplit
-
- protected override def collectFileSplits(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[FileSplit] = {
- val fileSlices = getFileSlices(partitionFilters, dataFilters)
- val isPartitioned = metaClient.getTableConfig.isTablePartitioned
- fileSlices.map { fileSlice =>
- val baseFile = fileSlice.getBaseFile.get()
- if (baseFile.getBootstrapBaseFile.isPresent) {
- val partitionValues =
getPartitionColumnsAsInternalRowInternal(baseFile.getPathInfo,
- metaClient.getBasePath, extractPartitionValuesFromPartitionPath =
isPartitioned)
- val dataFile = createPartitionedFile(
- partitionValues,
baseFile.getBootstrapBaseFile.get.getPathInfo.getPath,
- 0, baseFile.getBootstrapBaseFile.get().getFileLen)
- val skeletonFile = Option(createPartitionedFile(
- InternalRow.empty, baseFile.getStoragePath, 0, baseFile.getFileLen))
-
- createFileSplit(fileSlice, dataFile, skeletonFile)
- } else {
- val dataFile = createPartitionedFile(
- getPartitionColumnsAsInternalRow(baseFile.getPathInfo),
baseFile.getStoragePath, 0, baseFile.getFileLen)
- createFileSplit(fileSlice, dataFile, Option.empty)
- }
- }
- }
-
- /**
- * get all the file readers required for composeRDD
- */
- protected def getFileReaders(tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- requestedColumns: Array[String],
- filters: Array[Filter]): (BaseFileReader,
BaseFileReader, BaseFileReader) = {
- val requiredSkeletonFileSchema =
- StructType(skeletonSchema.filter(f => requestedColumns.exists(col =>
resolver(f.name, col))))
-
- val (bootstrapDataFileReader, bootstrapSkeletonFileReader) =
- createBootstrapFileReaders(tableSchema, requiredSchema,
requiredSkeletonFileSchema, filters)
-
- val regularFileReader = createRegularFileReader(tableSchema,
requiredSchema, filters)
- (bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader)
- }
-
- protected override def composeRDD(fileSplits: Seq[FileSplit],
- tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- requestedColumns: Array[String],
- filters: Array[Filter]): RDD[InternalRow]
= {
-
- val (bootstrapDataFileReader, bootstrapSkeletonFileReader,
regularFileReader) = getFileReaders(tableSchema,
- requiredSchema, requestedColumns, filters)
- new HoodieBootstrapRDD(sqlContext.sparkSession, bootstrapDataFileReader,
bootstrapSkeletonFileReader, regularFileReader,
- requiredSchema, fileSplits)
- }
-
- /**
- * Creates skeleton and base file reader
- */
- private def createBootstrapFileReaders(tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- requiredSkeletonFileSchema:
StructType,
- filters: Array[Filter]):
(BaseFileReader, BaseFileReader) = {
- // NOTE: "Data" schema in here refers to the whole table's schema that
doesn't include only partition
- // columns, as opposed to data file schema not including any
meta-fields columns in case of
- // Bootstrap relation
- val (partitionSchema, dataSchema, requiredDataSchema) =
- tryPrunePartitionColumnsInternal(tableSchema, requiredSchema,
extractPartitionValuesFromPartitionPath = true)
-
- val bootstrapDataFileSchema =
StructType(dataSchema.structTypeSchema.filterNot(sf => isMetaField(sf.name)))
- val requiredBootstrapDataFileSchema =
StructType(requiredDataSchema.structTypeSchema.filterNot(sf =>
isMetaField(sf.name)))
-
- validate(requiredDataSchema, requiredBootstrapDataFileSchema,
requiredSkeletonFileSchema)
-
- val bootstrapDataFileReader = createBaseFileReader(
- spark = sqlContext.sparkSession,
- dataSchema = new HoodieTableSchema(bootstrapDataFileSchema,
convertToAvroSchema(bootstrapDataFileSchema, tableName).toString),
- partitionSchema = partitionSchema,
- requiredDataSchema = new
HoodieTableSchema(requiredBootstrapDataFileSchema,
convertToAvroSchema(requiredBootstrapDataFileSchema, tableName).toString),
- // NOTE: For bootstrapped files we can't apply any filtering in case
we'd need to merge it with
- // a skeleton-file as we rely on matching ordering of the records
across bootstrap- and skeleton-files
- filters = if (requiredSkeletonFileSchema.isEmpty) filters else Seq(),
- options = optParams,
- hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf(),
- // NOTE: Bootstrap relation have to always extract partition values from
the partition-path as this is a
- // default Spark behavior: Spark by default strips
partition-columns from the data schema and does
- // NOT persist them in the data files, instead parsing them from
partition-paths (on the fly) whenever
- // table is queried
- shouldAppendPartitionValuesOverride = Some(true)
- )
-
- val boostrapSkeletonFileReader = createBaseFileReader(
- spark = sqlContext.sparkSession,
- dataSchema = new HoodieTableSchema(skeletonSchema,
convertToAvroSchema(skeletonSchema, tableName).toString),
- // NOTE: Here we specify partition-schema as empty since we don't need
Spark to inject partition-values
- // parsed from the partition-path
- partitionSchema = StructType(Seq.empty),
- requiredDataSchema = new HoodieTableSchema(requiredSkeletonFileSchema,
convertToAvroSchema(requiredSkeletonFileSchema, tableName).toString),
- // NOTE: For bootstrapped files we can't apply any filtering in case
we'd need to merge it with
- // a skeleton-file as we rely on matching ordering of the records
across bootstrap- and skeleton-files
- filters = if (requiredBootstrapDataFileSchema.isEmpty) filters else
Seq(),
- options = optParams,
- hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf(),
- // NOTE: We override Spark to avoid injecting partition values into the
records read from
- // skeleton-file
- shouldAppendPartitionValuesOverride = Some(false)
- )
-
- (bootstrapDataFileReader, boostrapSkeletonFileReader)
- }
-
- /**
- * create reader for hudi base files
- */
- private def createRegularFileReader(tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- filters: Array[Filter]): BaseFileReader =
{
- // NOTE: "Data" schema in here refers to the whole table's schema that
doesn't include only partition
- // columns, as opposed to data file schema not including any
meta-fields columns in case of
- // Bootstrap relation
- val (partitionSchema, dataSchema, requiredDataSchema) =
- tryPrunePartitionColumns(tableSchema, requiredSchema)
-
- // NOTE: Bootstrapped table allows Hudi created file-slices to be
co-located w/ the "bootstrapped"
- // ones (ie persisted by Spark). Therefore to be able to read the
data from Bootstrapped
- // table we also need to create regular file-reader to read
file-slices created by Hudi
- val regularFileReader = createBaseFileReader(
- spark = sqlContext.sparkSession,
- dataSchema = dataSchema,
- partitionSchema = partitionSchema,
- requiredDataSchema = requiredDataSchema,
- filters = filters,
- options = optParams,
- hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
- )
-
- // NOTE: In some case schema of the reader's output (reader's schema)
might not match the schema expected by the caller.
- // This could occur for ex, when requested schema contains partition
columns which might not be persisted w/in the
- // data file, but instead would be parsed from the partition path.
In that case output of the file-reader will have
- // different ordering of the fields than the original required
schema (for more details please check out
- // [[ParquetFileFormat]] impl). In that case we have to project the
rows from the file-reader's schema
- // back into the one expected by the caller
- projectReader(regularFileReader, requiredSchema.structTypeSchema)
- }
-}
-
-object HoodieBootstrapRelation extends SparkAdapterSupport {
- def validate(requiredDataSchema: HoodieTableSchema, requiredDataFileSchema:
StructType, requiredSkeletonFileSchema: StructType): Unit = {
- val requiredDataColumns: Seq[String] =
requiredDataSchema.structTypeSchema.fieldNames.toSeq
- val combinedColumns = (requiredSkeletonFileSchema.fieldNames ++
requiredDataFileSchema.fieldNames).toSeq
-
- // NOTE: Here we validate that all required data columns are covered by
the combination of the columns
- // from both skeleton file and the corresponding data file
- checkState(combinedColumns.sorted == requiredDataColumns.sorted)
- }
-
- def createPartitionedFile(partitionValues: InternalRow,
- filePath: StoragePath,
- start: Long,
- length: Long): PartitionedFile = {
- sparkAdapter.getSparkPartitionedFileUtils.createPartitionedFile(
- partitionValues, filePath, start, length)
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index bad337c18527..de6341c8caef 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -20,6 +20,7 @@ package org.apache.hudi
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema,
isSchemaEvolutionEnabledOnRead}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.cdc.HoodieCDCFileIndex
import org.apache.hudi.common.config.HoodieReaderConfig
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
@@ -348,7 +349,7 @@ class HoodieMergeOnReadCDCHadoopFsRelationFactory(override
val sqlContext: SQLCo
override def buildFileIndex(): HoodieFileIndex = hoodieCDCFileIndex
- override def buildDataSchema(): StructType =
hoodieCDCFileIndex.cdcRelation.schema
+ override def buildDataSchema(): StructType =
HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA
override def buildPartitionSchema(): StructType = StructType(Nil)
@@ -448,7 +449,7 @@ class HoodieCopyOnWriteCDCHadoopFsRelationFactory(override
val sqlContext: SQLCo
sparkSession, metaClient, schemaSpec, options, fileStatusCache, false,
rangeType)
override def buildFileIndex(): HoodieFileIndex = hoodieCDCFileIndex
- override def buildDataSchema(): StructType =
hoodieCDCFileIndex.cdcRelation.schema
+ override def buildDataSchema(): StructType =
HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA
override def buildPartitionSchema(): StructType = StructType(Nil)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV1.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV1.scala
deleted file mode 100644
index 872c3fe32991..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV1.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.HoodieBaseRelation.{projectReader, BaseFileReader}
-import org.apache.hudi.HoodieMergeOnReadRDDV1.CONFIG_INSTANTIATION_LOCK
-import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible
-import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.common.util.StringUtils
-import org.apache.hudi.exception.HoodieException
-import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.spark.{Partition, SerializableWritable, SparkContext,
TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-
-import java.io.Closeable
-import java.util.function.Predicate
-
-/**
- * RDD enabling Hudi's Merge-on-Read (MOR) semantic
- *
- * @param sc spark's context
- * @param config hadoop configuration
- * @param fileReaders suite of base file readers
- * @param tableSchema table's full schema
- * @param requiredSchema expected (potentially) projected schema
- * @param tableState table's state
- * @param mergeType type of merge performed
- * @param fileSplits target file-splits this RDD will be iterating over
- * @param includeStartTime whether to include the commit with the commitTime
- * @param startTimestamp start timestamp to filter records
- * @param endTimestamp end timestamp to filter records
- */
-class HoodieMergeOnReadRDDV1(@transient sc: SparkContext,
- @transient config: Configuration,
- fileReaders: HoodieMergeOnReadBaseFileReaders,
- tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- tableState: HoodieTableState,
- mergeType: String,
- @transient fileSplits:
Seq[HoodieMergeOnReadFileSplit],
- includeStartTime: Boolean = false,
- startTimestamp: String = null,
- endTimestamp: String = null)
- extends RDD[InternalRow](sc, Nil) with HoodieUnsafeRDD {
-
- protected val maxCompactionMemoryInBytes: Long =
getMaxCompactionMemoryInBytes(new JobConf(config))
-
- private val hadoopConfBroadcast = sc.broadcast(new
SerializableWritable(config))
-
- override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
- val partition = split.asInstanceOf[HoodieMergeOnReadPartition]
- val iter = partition.split match {
- case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
- val projectedReader =
projectReader(fileReaders.requiredSchemaReaderSkipMerging,
requiredSchema.structTypeSchema)
- projectedReader(dataFileOnlySplit.dataFile.get)
-
- case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
- new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema,
tableState, getHadoopConf)
-
- case split =>
- mergeType match {
- case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
- val reader = fileReaders.requiredSchemaReaderSkipMerging
- new SkipMergeIterator(split, reader, tableSchema, requiredSchema,
tableState, getHadoopConf)
-
- case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
- val reader = pickBaseFileReader()
- new RecordMergingFileIterator(split, reader, tableSchema,
requiredSchema, tableState, getHadoopConf)
-
- case _ => throw new UnsupportedOperationException(s"Not supported
merge type ($mergeType)")
- }
-
- case _ => throw new HoodieException(s"Unable to select an Iterator to
read the Hoodie MOR File Split for " +
- s"file path: ${partition.split.dataFile.get.filePath}" +
- s"log paths: ${partition.split.logFiles.toString}" +
- s"hoodie table path: ${tableState.tablePath}" +
- s"spark partition Index: ${partition.index}" +
- s"merge type: ${mergeType}")
- }
-
- if (iter.isInstanceOf[Closeable]) {
- // register a callback to close logScanner which will be executed on
task completion.
- // when tasks finished, this method will be called, and release
resources.
- Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ =>
iter.asInstanceOf[Closeable].close()))
- }
-
- val commitTimeMetadataFieldIdx =
requiredSchema.structTypeSchema.fieldNames.indexOf(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
- val needsFiltering = commitTimeMetadataFieldIdx >= 0 &&
!StringUtils.isNullOrEmpty(startTimestamp) &&
!StringUtils.isNullOrEmpty(endTimestamp)
- if (needsFiltering) {
- val filterT: Predicate[InternalRow] =
getCommitTimeFilter(includeStartTime, commitTimeMetadataFieldIdx)
- iter.filter(filterT.test)
- }
- else {
- iter
- }
- }
-
- private def getCommitTimeFilter(includeStartTime: Boolean,
commitTimeMetadataFieldIdx: Int): Predicate[InternalRow] = {
- if (includeStartTime) {
- new Predicate[InternalRow] {
- override def test(row: InternalRow): Boolean = {
- val commitTime = row.getString(commitTimeMetadataFieldIdx)
- commitTime >= startTimestamp && commitTime <= endTimestamp
- }
- }
- } else {
- new Predicate[InternalRow] {
- override def test(row: InternalRow): Boolean = {
- val commitTime = row.getString(commitTimeMetadataFieldIdx)
- commitTime > startTimestamp && commitTime <= endTimestamp
- }
- }
- }
- }
-
- private def pickBaseFileReader(): BaseFileReader = {
- // NOTE: This is an optimization making sure that even for MOR tables we
fetch absolute minimum
- // of the stored data possible, while still properly executing
corresponding relation's semantic
- // and meet the query's requirements.
- //
- // Here we assume that iff queried table does use one of the
standard (and whitelisted)
- // Record Payload classes then we can avoid reading and parsing the
records w/ _full_ schema,
- // and instead only rely on projected one, nevertheless being able
to perform merging correctly
- if (isProjectionCompatible(tableState)) {
- fileReaders.requiredSchemaReader
- } else {
- fileReaders.fullSchemaReader
- }
- }
-
- override protected def getPartitions: Array[Partition] =
- fileSplits.zipWithIndex.map(file => HoodieMergeOnReadPartition(file._2,
file._1)).toArray
-
- private def getHadoopConf: Configuration = {
- val conf = hadoopConfBroadcast.value.value
- // TODO clean up, this lock is unnecessary
- CONFIG_INSTANTIATION_LOCK.synchronized {
- new Configuration(conf)
- }
- }
-}
-
-object HoodieMergeOnReadRDDV1 {
- val CONFIG_INSTANTIATION_LOCK = new Object()
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
index 0b86ac6e3bf9..a1523a4f84e9 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
@@ -20,7 +20,6 @@ package org.apache.hudi
import org.apache.hudi.HoodieBaseRelation.{projectReader, BaseFileReader}
import org.apache.hudi.HoodieMergeOnReadRDDV2.CONFIG_INSTANTIATION_LOCK
-import org.apache.hudi.LogFileIterator.getPartitionPath
import org.apache.hudi.avro.HoodieAvroReaderContext
import org.apache.hudi.common.config.{HoodieReaderConfig, TypedProperties}
import org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE
@@ -36,6 +35,7 @@ import org.apache.hudi.expression.{Predicate => HPredicate}
import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
import org.apache.hudi.metadata.HoodieTableMetadataUtil
+import org.apache.hudi.storage.StoragePath
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
import org.apache.avro.Schema
@@ -255,6 +255,16 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
override def close(): Unit = closeableFileGroupRecordIterator.close()
}
}
+
+ private def getPartitionPath(split: HoodieMergeOnReadFileSplit): StoragePath
= {
+ // Determine partition path as an immediate parent folder of either
+ // - The base file
+ // - Some log file
+ split.dataFile.map(baseFile =>
+
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(baseFile))
+ .getOrElse(split.logFiles.head.getPath)
+ .getParent
+ }
}
object HoodieMergeOnReadRDDV2 {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
deleted file mode 100644
index a6e158f1bc10..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.HoodieBaseRelation.BaseFileReader
-import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
-import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
-import org.apache.hudi.LogFileIterator.{getPartitionPath, scanLog}
-import org.apache.hudi.avro.{AvroRecordContext, AvroSchemaCache}
-import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig,
HoodieMetadataConfig, TypedProperties}
-import org.apache.hudi.common.engine.{EngineType, HoodieLocalEngineContext,
RecordContext}
-import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
-import org.apache.hudi.common.model.{HoodieAvroIndexedRecord,
HoodieEmptyRecord, HoodieLogFile, HoodieOperation, HoodieRecord,
HoodieSparkRecord}
-import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
-import org.apache.hudi.common.table.read.{BufferedRecord, BufferedRecords,
DeleteContext}
-import org.apache.hudi.common.util.{FileIOUtils, HoodieRecordUtils}
-import org.apache.hudi.config.HoodiePayloadConfig
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
-import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
-import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieTableMetadata}
-import
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
-import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
-import org.apache.hudi.util.CachingIterator
-
-import org.apache.avro.Schema
-import org.apache.avro.generic.{GenericRecord, IndexedRecord}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
-import org.apache.spark.sql.HoodieInternalRowUtils
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Projection
-import org.apache.spark.sql.types.StructType
-
-import java.io.Closeable
-
-import scala.annotation.tailrec
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/**
- * Provided w/ list of log files, iterates over all of the records stored in
- * Delta Log files (represented as [[InternalRow]]s)
- */
-class LogFileIterator(logFiles: List[HoodieLogFile],
- partitionPath: StoragePath,
- tableSchema: HoodieTableSchema,
- requiredStructTypeSchema: StructType,
- requiredAvroSchema: Schema,
- tableState: HoodieTableState,
- config: Configuration)
- extends CachingIterator[InternalRow] with AvroDeserializerSupport {
-
- def this(logFiles: List[HoodieLogFile],
- partitionPath: StoragePath,
- tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- tableState: HoodieTableState,
- config: Configuration) {
- this(logFiles, partitionPath, tableSchema, requiredSchema.structTypeSchema,
- new Schema.Parser().parse(requiredSchema.avroSchemaStr), tableState,
config)
- }
- def this(split: HoodieMergeOnReadFileSplit,
- tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- tableState: HoodieTableState,
- config: Configuration) {
- this(split.logFiles, getPartitionPath(split), tableSchema, requiredSchema,
tableState, config)
- }
- private val maxCompactionMemoryInBytes: Long =
getMaxCompactionMemoryInBytes(new JobConf(config))
-
- protected val payloadProps: TypedProperties = if
(tableState.orderingFields.nonEmpty) {
- HoodiePayloadConfig.newBuilder
- .withPayloadOrderingFields(String.join(",", tableState.orderingFields:
_*))
- .build
- .getProps
- } else {
- new TypedProperties()
- }
-
- protected override val avroSchema: Schema = requiredAvroSchema
- protected override val structTypeSchema: StructType =
requiredStructTypeSchema
-
- protected val logFileReaderAvroSchema: Schema = AvroSchemaCache.intern(new
Schema.Parser().parse(tableSchema.avroSchemaStr))
- protected val logFileReaderStructType: StructType =
tableSchema.structTypeSchema
- protected val deleteContext: DeleteContext = new DeleteContext(payloadProps,
logFileReaderAvroSchema).withReaderSchema(logFileReaderAvroSchema);
-
- private val requiredSchemaAvroProjection: AvroProjection =
AvroProjection.create(avroSchema)
- private val requiredSchemaRowProjection: Projection =
generateUnsafeProjection(logFileReaderStructType, structTypeSchema)
-
- private val logRecords = {
- val internalSchema =
tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
-
- scanLog(logFiles, partitionPath, logFileReaderAvroSchema, tableState,
- maxCompactionMemoryInBytes, config, internalSchema)
- }
-
- private val (hasOperationField, operationFieldPos) = {
- val operationField =
logFileReaderAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD)
- if (operationField != null) {
- (true, operationField.pos())
- } else {
- (false, -1)
- }
- }
-
- protected def isDeleteOperation(r: InternalRow): Boolean = if
(hasOperationField) {
- val operation = r.getString(operationFieldPos)
- HoodieOperation.fromName(operation) == HoodieOperation.DELETE
- } else {
- false
- }
-
- protected def isDeleteOperation(r: GenericRecord): Boolean = if
(hasOperationField) {
- val operation = r.get(operationFieldPos).toString
- HoodieOperation.fromName(operation) == HoodieOperation.DELETE
- } else {
- false
- }
-
- def logRecordsPairIterator(): Iterator[(String, HoodieRecord[_])] = {
- logRecords.iterator
- }
-
- // NOTE: This have to stay lazy to make sure it's initialized only at the
point where it's
- // going to be used, since we modify `logRecords` before that and
therefore can't do it any earlier
- private lazy val logRecordsIterator: Iterator[Option[HoodieRecord[_]]] =
- logRecords.iterator.map {
- case (_, record: HoodieSparkRecord) => Option(record)
- case (_, _: HoodieEmptyRecord[_]) => Option.empty
- case (_, record) =>
- toScalaOption(record.toIndexedRecord(logFileReaderAvroSchema,
payloadProps))
-
- }
-
- protected def removeLogRecord(key: String): Option[HoodieRecord[_]] =
logRecords.remove(key)
-
- protected def doHasNext: Boolean = hasNextInternal
-
- // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to
make sure
- // that recursion is unfolded into a loop to avoid stack overflows
while
- // handling records
- @tailrec private def hasNextInternal: Boolean = {
- logRecordsIterator.hasNext && {
- logRecordsIterator.next() match {
- case Some(r: HoodieAvroIndexedRecord) =>
- val data = r.getData.asInstanceOf[GenericRecord]
- if (isDeleteOperation(data)) {
- this.hasNextInternal
- } else {
- val projectedAvroRecord = requiredSchemaAvroProjection(data)
- nextRecord = deserialize(projectedAvroRecord)
- true
- }
- case Some(r: HoodieSparkRecord) =>
- val data = r.getData
- if (isDeleteOperation(data)) {
- this.hasNextInternal
- } else {
- nextRecord = requiredSchemaRowProjection(data)
- true
- }
- case None => this.hasNextInternal
- }
- }
- }
-}
-
-/**
- * Provided w/ list of log files and base file iterator, provides an iterator
over all of the records stored in
- * Base file as well as all of the Delta Log files simply returning
concatenation of these streams, while not
- * performing any combination/merging of the records w/ the same primary keys
(ie producing duplicates potentially)
- */
-class SkipMergeIterator(logFiles: List[HoodieLogFile],
- partitionPath: StoragePath,
- baseFileIterator: Iterator[InternalRow],
- readerSchema: StructType,
- dataSchema: HoodieTableSchema,
- requiredStructTypeSchema: StructType,
- requiredAvroSchema: Schema,
- tableState: HoodieTableState,
- config: Configuration)
- extends LogFileIterator(logFiles, partitionPath, dataSchema,
requiredStructTypeSchema, requiredAvroSchema, tableState, config) {
-
- def this(split: HoodieMergeOnReadFileSplit, baseFileReader: BaseFileReader,
dataSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema, tableState: HoodieTableState,
config: Configuration) {
- this(split.logFiles, getPartitionPath(split),
baseFileReader(split.dataFile.get),
- baseFileReader.schema, dataSchema, requiredSchema.structTypeSchema,
- new Schema.Parser().parse(requiredSchema.avroSchemaStr), tableState,
config)
- }
-
- private val requiredSchemaProjection =
generateUnsafeProjection(readerSchema, structTypeSchema)
-
- override def doHasNext: Boolean = {
- if (baseFileIterator.hasNext) {
- // No merge is required, simply load current row and project into
required schema
- nextRecord = requiredSchemaProjection(baseFileIterator.next())
- true
- } else {
- super[LogFileIterator].doHasNext
- }
- }
-}
-
-/**
- * Provided w/ list of log files and base file iterator, provides an iterator
over all of the records stored in
- * a) Base file and all of the b) Delta Log files combining records with the
same primary key from both of these
- * streams
- */
-class RecordMergingFileIterator(logFiles: List[HoodieLogFile],
- partitionPath: StoragePath,
- baseFileIterator: Iterator[InternalRow],
- readerSchema: StructType,
- dataSchema: HoodieTableSchema,
- requiredStructTypeSchema: StructType,
- requiredAvroSchema: Schema,
- tableState: HoodieTableState,
- config: Configuration)
- extends LogFileIterator(logFiles, partitionPath, dataSchema,
requiredStructTypeSchema, requiredAvroSchema, tableState, config) {
-
- def this(logFiles: List[HoodieLogFile],
- partitionPath: StoragePath,
- baseFileIterator: Iterator[InternalRow],
- readerSchema: StructType,
- dataSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- tableState: HoodieTableState,
- config: Configuration) {
- this(logFiles, partitionPath, baseFileIterator, readerSchema, dataSchema,
requiredSchema.structTypeSchema,
- new Schema.Parser().parse(requiredSchema.avroSchemaStr), tableState,
config)
- }
- def this(split: HoodieMergeOnReadFileSplit, baseFileReader: BaseFileReader,
dataSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema, tableState: HoodieTableState,
config: Configuration) {
- this(split.logFiles, getPartitionPath(split),
baseFileReader(split.dataFile.get),
- baseFileReader.schema, dataSchema, requiredSchema, tableState, config)
- }
-
- // NOTE: Record-merging iterator supports 2 modes of operation merging
records bearing either
- // - Full table's schema
- // - Projected schema
- // As such, no particular schema could be assumed, and therefore we
rely on the caller
- // to correspondingly set the schema of the expected output of
base-file reader
- private val baseFileReaderAvroSchema =
AvroSchemaCache.intern(sparkAdapter.getAvroSchemaConverters.toAvroType(readerSchema,
nullable = false, "record"))
-
- private val serializer = sparkAdapter.createAvroSerializer(readerSchema,
baseFileReaderAvroSchema, nullable = false)
-
- private val recordKeyOrdinal =
readerSchema.fieldIndex(tableState.recordKeyField)
-
- private val requiredSchemaProjection =
generateUnsafeProjection(readerSchema, structTypeSchema)
- private val requiredSchemaAvroProjection = AvroProjection.create(avroSchema)
-
- private val recordMerger =
HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK,
- tableState.recordMergeImplClasses.asJava, tableState.recordMergeStrategyId)
-
- private val rowRecordContext: RecordContext[InternalRow] =
SparkFileFormatInternalRecordContext.getFieldAccessorInstance
- private val avroRecordContext: RecordContext[IndexedRecord] =
AvroRecordContext.getFieldAccessorInstance
- private val orderingFields: Array[String] = tableState.orderingFields.toArray
-
- override def doHasNext: Boolean = hasNextInternal
-
- // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to
make sure
- // that recursion is unfolded into a loop to avoid stack overflows
while
- // handling records
- @tailrec private def hasNextInternal: Boolean = {
- if (baseFileIterator.hasNext) {
- val curRow = baseFileIterator.next()
- val curKey = curRow.getString(recordKeyOrdinal)
- val updatedRecordOpt = removeLogRecord(curKey)
- if (updatedRecordOpt.isEmpty) {
- // No merge is required, simply load current row and project into
required schema
- nextRecord = requiredSchemaProjection(curRow)
- true
- } else {
- val mergedRecordOpt = merge(curRow, updatedRecordOpt.get)
- if (mergedRecordOpt.isEmpty) {
- // Record has been deleted, skipping
- this.hasNextInternal
- } else {
- nextRecord = mergedRecordOpt.get
- true
- }
- }
- } else {
- super[LogFileIterator].doHasNext
- }
- }
-
- private def serialize(curRowRecord: InternalRow): GenericRecord =
- serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
-
- private def merge(curRow: InternalRow, newRecord: HoodieRecord[_]):
Option[InternalRow] = {
- // NOTE: We have to pass in Avro Schema used to read from Delta Log file
since we invoke combining API
- // on the record from the Delta Log
- recordMerger.getRecordType match {
- case HoodieRecordType.SPARK =>
- val curRecord = BufferedRecords.fromEngineRecord(curRow,
baseFileReaderAvroSchema, rowRecordContext, orderingFields,
newRecord.getRecordKey, false)
- val newBufferedRecord = BufferedRecords.fromHoodieRecord(newRecord,
logFileReaderAvroSchema, rowRecordContext, payloadProps, orderingFields,
deleteContext)
- val result = recordMerger.merge(curRecord, newBufferedRecord,
rowRecordContext, payloadProps).asInstanceOf[BufferedRecord[InternalRow]]
- if (result.isDelete) {
- None
- } else {
- val schema =
HoodieInternalRowUtils.getCachedSchema(rowRecordContext.getSchemaFromBufferRecord(result))
- val projection =
HoodieInternalRowUtils.getCachedUnsafeProjection(schema, structTypeSchema)
- Some(projection.apply(result.getRecord))
- }
- case _ =>
- val curRecord = BufferedRecords.fromEngineRecord(serialize(curRow),
baseFileReaderAvroSchema, avroRecordContext, orderingFields,
newRecord.getRecordKey, false)
- val newBufferedRecord = BufferedRecords.fromHoodieRecord(newRecord,
logFileReaderAvroSchema, avroRecordContext, payloadProps, orderingFields,
deleteContext)
- val result = recordMerger.merge(curRecord, newBufferedRecord,
avroRecordContext, payloadProps).asInstanceOf[BufferedRecord[IndexedRecord]]
- if (result.isDelete) {
- None
- } else {
- val avroRecord = result.getRecord.asInstanceOf[GenericRecord]
- Some(deserialize(requiredSchemaAvroProjection(avroRecord)))
- }
- }
- }
-}
-
-object LogFileIterator extends SparkAdapterSupport {
-
- def scanLog(logFiles: List[HoodieLogFile],
- partitionPath: StoragePath,
- logSchema: Schema,
- tableState: HoodieTableState,
- maxCompactionMemoryInBytes: Long,
- hadoopConf: Configuration,
- internalSchema: InternalSchema =
InternalSchema.getEmptyInternalSchema): mutable.Map[String, HoodieRecord[_]] = {
- val tablePath = tableState.tablePath
- val storage = HoodieStorageUtils.getStorage(tablePath,
HadoopFSUtils.getStorageConf(hadoopConf))
-
- val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
- .withStorage(storage)
- .withBasePath(tablePath)
- .withLogFilePaths(logFiles.map(logFile =>
logFile.getPath.toString).asJava)
- .withReaderSchema(logSchema)
- // NOTE: This part shall only be reached when at least one log is
present in the file-group
- // entailing that table has to have at least one commit
- .withLatestInstantTime(tableState.latestCommitTimestamp.get)
- .withReverseReader(false)
- .withInternalSchema(internalSchema)
- .withBufferSize(
- hadoopConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
- HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
- .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
- .withSpillableMapBasePath(
- hadoopConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key,
- FileIOUtils.getDefaultSpillableMapBasePath))
- .withDiskMapType(
- hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
- HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
- .withBitCaskDiskMapCompressionEnabled(
-
hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
-
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
-
- if (logFiles.nonEmpty) {
- logRecordScannerBuilder.withPartition(getRelativePartitionPath(
- new StoragePath(tableState.tablePath),
logFiles.head.getPath.getParent))
- }
-
- logRecordScannerBuilder.withRecordMerger(
- HoodieRecordUtils.createRecordMerger(tableState.tablePath,
EngineType.SPARK, tableState.recordMergeImplClasses.asJava,
tableState.recordMergeStrategyId))
-
- val scanner = logRecordScannerBuilder.build()
-
- closing(scanner) {
- // NOTE: We have to copy record-map (by default immutable copy is
exposed)
- mutable.HashMap(scanner.getRecords.asScala.toSeq: _*)
- }
- }
-
- def closing[T](c: Closeable)(f: => T): T = {
- try { f } finally {
- c.close()
- }
- }
-
- def getPartitionPath(split: HoodieMergeOnReadFileSplit): StoragePath = {
- // Determine partition path as an immediate parent folder of either
- // - The base file
- // - Some log file
- split.dataFile.map(baseFile =>
-
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(baseFile))
- .getOrElse(split.logFiles.head.getPath)
- .getParent
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
index 10eb1cfd272e..522152558085 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
@@ -86,18 +86,20 @@ case class MergeOnReadIncrementalRelationV1(override val
sqlContext: SQLContext,
val optionalFilters = filters
val readers = createBaseFileReaders(tableSchema, requiredSchema,
requestedColumns, requiredFilters, optionalFilters)
- new HoodieMergeOnReadRDDV1(
+ new HoodieMergeOnReadRDDV2(
sqlContext.sparkContext,
config = jobConf,
+ sqlConf = sqlContext.sparkSession.sessionState.conf,
fileReaders = readers,
tableSchema = tableSchema,
requiredSchema = requiredSchema,
tableState = tableState,
mergeType = mergeType,
fileSplits = fileSplits,
- includeStartTime = includeStartTime,
- startTimestamp = startTs,
- endTimestamp = endTs)
+ includedInstantTimeSet =
Option(includedCommits.map(_.requestedTime).toSet),
+ optionalFilters = optionalFilters,
+ metaClient = metaClient,
+ options = optParams)
}
override protected def collectFileSplits(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index 471f9a3f0a6e..39ef098b5f30 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.{AvroConversionUtils,
HoodieTableSchema, SparkAdapterSupp
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.cdc.CDCFileGroupIterator.{CDC_OPERATION_DELETE,
CDC_OPERATION_INSERT, CDC_OPERATION_UPDATE}
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig,
HoodieMetadataConfig, HoodieReaderConfig, TypedProperties}
import
org.apache.hudi.common.config.HoodieCommonConfig.{DISK_MAP_BITCASK_COMPRESSION_ENABLED,
SPILLABLE_DISK_MAP_TYPE}
import org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH
@@ -48,6 +49,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import
org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter
+import org.apache.spark.Partition
import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.HoodieAvroDeserializer
@@ -66,6 +68,12 @@ import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable
+/**
+ * The split that will be processed by spark task.
+ * The [[changes]] should be sorted first.
+ */
+case class HoodieCDCFileGroupSplit(changes: Array[HoodieCDCFileSplit])
+
class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
metaClient: HoodieTableMetaClient,
conf: StorageConfiguration[Configuration],
@@ -351,7 +359,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
// no real record is deleted, just ignore.
} else {
// there is a real record deleted.
- recordToLoad.update(0, CDCRelation.CDC_OPERATION_DELETE)
+ recordToLoad.update(0, CDC_OPERATION_DELETE)
recordToLoad.update(2,
convertBufferedRecordToJsonString(existingRecordOpt.get))
recordToLoad.update(3, null)
loaded = true
@@ -360,7 +368,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
val existingRecordOpt = beforeImageRecords.get(logRecord.getRecordKey)
if (existingRecordOpt.isEmpty) {
// a new record is inserted.
- recordToLoad.update(0, CDCRelation.CDC_OPERATION_INSERT)
+ recordToLoad.update(0, CDC_OPERATION_INSERT)
recordToLoad.update(2, null)
recordToLoad.update(3, convertBufferedRecordToJsonString(logRecord))
// insert into beforeImageRecords
@@ -371,7 +379,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
val existingRecord = existingRecordOpt.get
val mergeRecord = merge(existingRecord, logRecord)
if (existingRecord != mergeRecord) {
- recordToLoad.update(0, CDCRelation.CDC_OPERATION_UPDATE)
+ recordToLoad.update(0, CDC_OPERATION_UPDATE)
recordToLoad.update(2,
convertBufferedRecordToJsonString(existingRecord))
recordToLoad.update(3,
convertBufferedRecordToJsonString(mergeRecord))
// update into beforeImageRecords
@@ -460,11 +468,11 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
recordToLoad = currentCDCFileSplit.getCdcInferCase match {
case BASE_FILE_INSERT =>
InternalRow.fromSeq(Seq(
- CDCRelation.CDC_OPERATION_INSERT,
convertToUTF8String(currentInstant),
+ CDC_OPERATION_INSERT, convertToUTF8String(currentInstant),
null, null))
case BASE_FILE_DELETE =>
InternalRow.fromSeq(Seq(
- CDCRelation.CDC_OPERATION_DELETE,
convertToUTF8String(currentInstant),
+ CDC_OPERATION_DELETE, convertToUTF8String(currentInstant),
null, null))
case LOG_FILE =>
InternalRow.fromSeq(Seq(
@@ -476,7 +484,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
null, null))
case REPLACE_COMMIT =>
InternalRow.fromSeq(Seq(
- CDCRelation.CDC_OPERATION_DELETE,
convertToUTF8String(currentInstant),
+ CDC_OPERATION_DELETE, convertToUTF8String(currentInstant),
null, null))
}
}
@@ -594,3 +602,9 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
}
}
}
+
+object CDCFileGroupIterator {
+ val CDC_OPERATION_DELETE: UTF8String = UTF8String.fromString(DELETE.getValue)
+ val CDC_OPERATION_INSERT: UTF8String = UTF8String.fromString(INSERT.getValue)
+ val CDC_OPERATION_UPDATE: UTF8String = UTF8String.fromString(UPDATE.getValue)
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
deleted file mode 100644
index bfcbbc031018..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.cdc
-
-import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions,
HoodieDataSourceHelper, HoodieTableSchema}
-import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
-import org.apache.hudi.common.table.cdc.HoodieCDCExtractor
-import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
-import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils._
-import org.apache.hudi.common.table.log.InstantRange
-import org.apache.hudi.common.table.log.InstantRange.RangeType
-import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.internal.schema.InternalSchema
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SparkSession, SQLContext}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.unsafe.types.UTF8String
-
-import scala.collection.JavaConverters._
-import scala.util.{Failure, Success, Try}
-
-/**
- * Hoodie CDC Relation extends Spark's [[BaseRelation]], provide the schema of
cdc
- * and the [[buildScan]] to return the change-data in a specified range.
- */
-class CDCRelation(
- override val sqlContext: SQLContext,
- metaClient: HoodieTableMetaClient,
- startInstant: String,
- endInstant: String,
- options: Map[String, String],
- rangeType: RangeType = InstantRange.RangeType.OPEN_CLOSED
-) extends BaseRelation with PrunedFilteredScan with Logging {
-
- imbueConfigs(sqlContext)
-
- val spark: SparkSession = sqlContext.sparkSession
-
- val (tableAvroSchema, _) = {
- val schemaUtil = new TableSchemaResolver(metaClient)
- val avroSchema = Try(schemaUtil.getTableAvroSchema) match {
- case Success(schema) => schema
- case Failure(e) =>
- throw new IllegalArgumentException("Failed to fetch schema from the
table", e)
- }
- // try to find internalSchema
- val internalSchemaFromMeta = try {
-
schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
- } catch {
- case _: Exception => InternalSchema.getEmptyInternalSchema
- }
- (avroSchema, internalSchemaFromMeta)
- }
-
- val tableStructSchema: StructType =
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
-
- val cdcExtractor: HoodieCDCExtractor =
- new HoodieCDCExtractor(
- metaClient,
- InstantRange.builder()
- .startInstant(startInstant)
- .endInstant(endInstant)
- .nullableBoundary(true)
- .rangeType(rangeType).build(),
- false)
-
- override final def needConversion: Boolean = false
-
- override def schema: StructType = CDCRelation.FULL_CDC_SPARK_SCHEMA
-
- override def buildScan(requiredColumns: Array[String], filters:
Array[Filter]): RDD[Row] = {
- val internalRows = buildScan0(requiredColumns, filters)
- internalRows.asInstanceOf[RDD[Row]]
- }
-
- def buildScan0(requiredColumns: Array[String], filters: Array[Filter]):
RDD[InternalRow] = {
- val nameToField = schema.fields.map(f => f.name -> f).toMap
- val requiredSchema = StructType(requiredColumns.map(nameToField))
- val originTableSchema = HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString)
- val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = spark,
- dataSchema = tableStructSchema,
- partitionSchema = StructType(Nil),
- requiredSchema = tableStructSchema,
- filters = Nil,
- options = options,
- hadoopConf = spark.sessionState.newHadoopConf()
- )
-
- val changes = cdcExtractor.extractCDCFileSplits().values().asScala.map {
splits =>
- HoodieCDCFileGroupSplit(
- splits.asScala.sorted.toArray
- )
- }
- val cdcRdd = new HoodieCDCRDD(
- spark,
- metaClient,
- parquetReader,
- originTableSchema,
- schema,
- requiredSchema,
- changes.toArray
- )
- cdcRdd.asInstanceOf[RDD[InternalRow]]
- }
-
- def imbueConfigs(sqlContext: SQLContext): Unit = {
- // Disable vectorized reading for CDC relation
-
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
"false")
- }
-}
-
-object CDCRelation {
-
- val CDC_OPERATION_DELETE: UTF8String = UTF8String.fromString(DELETE.getValue)
- val CDC_OPERATION_INSERT: UTF8String = UTF8String.fromString(INSERT.getValue)
- val CDC_OPERATION_UPDATE: UTF8String = UTF8String.fromString(UPDATE.getValue)
-
- /**
- * CDC Schema For Spark.
- * Also it's schema when `hoodie.table.cdc.supplemental.logging.mode` is
[[DATA_BEFORE_AFTER]].
- * Here we use the debezium format.
- */
- val FULL_CDC_SPARK_SCHEMA: StructType = {
- StructType(
- Seq(
- StructField(CDC_OPERATION_TYPE, StringType),
- StructField(CDC_COMMIT_TIMESTAMP, StringType),
- StructField(CDC_BEFORE_IMAGE, StringType),
- StructField(CDC_AFTER_IMAGE, StringType)
- )
- )
- }
-
- /**
- * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is
[[OP_KEY_ONLY]].
- */
- val MIN_CDC_SPARK_SCHEMA: StructType = {
- StructType(
- Seq(
- StructField(CDC_OPERATION_TYPE, StringType),
- StructField(CDC_RECORD_KEY, StringType)
- )
- )
- }
-
- /**
- * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is
[[DATA_BEFORE]].
- */
- val CDC_WITH_BEFORE_SPARK_SCHEMA: StructType = {
- StructType(
- Seq(
- StructField(CDC_OPERATION_TYPE, StringType),
- StructField(CDC_RECORD_KEY, StringType),
- StructField(CDC_BEFORE_IMAGE, StringType)
- )
- )
- }
-
- def isCDCEnabled(metaClient: HoodieTableMetaClient): Boolean = {
- metaClient.getTableConfig.isCDCEnabled
- }
-
- /**
- * The only approach to create the CDC relation.
- */
- def getCDCRelation(
- sqlContext: SQLContext,
- metaClient: HoodieTableMetaClient,
- options: Map[String, String],
- rangeType: RangeType = RangeType.OPEN_CLOSED): CDCRelation = {
-
- if (!isCDCEnabled(metaClient)) {
- throw new IllegalArgumentException(s"It isn't a CDC hudi table on
${metaClient.getBasePath}")
- }
-
- val startCompletionTime =
options.getOrElse(DataSourceReadOptions.START_COMMIT.key(),
- throw new HoodieException(s"CDC Query should provide the valid start
completion time "
- + s"through the option ${DataSourceReadOptions.START_COMMIT.key()}")
- )
- val endCompletionTime =
options.getOrElse(DataSourceReadOptions.END_COMMIT.key(),
- getTimestampOfLatestInstant(metaClient)
- )
-
- new CDCRelation(sqlContext, metaClient, startCompletionTime,
endCompletionTime, options, rangeType)
- }
-
- def getTimestampOfLatestInstant(metaClient: HoodieTableMetaClient): String =
{
- val latestInstant = metaClient.getActiveTimeline.lastInstant()
- if (latestInstant.isPresent) {
- latestInstant.get().requestedTime
- } else {
- throw new HoodieException("No valid instant in Active Timeline.")
- }
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCFileIndex.scala
similarity index 62%
rename from
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
rename to
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCFileIndex.scala
index 81eecf3272a5..bf8f0f52c2bd 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCFileIndex.scala
@@ -17,19 +17,22 @@
* under the License.
*/
-package org.apache.hudi
+package org.apache.hudi.cdc
-import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.{DataSourceReadOptions, HoodieFileIndex}
+import org.apache.hudi.cdc.HoodieCDCFileIndex.isCDCEnabled
import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.table.cdc.HoodieCDCExtractor
+import org.apache.hudi.common.table.cdc.{HoodieCDCExtractor, HoodieCDCUtils}
+import org.apache.hudi.common.table.log.InstantRange
import org.apache.hudi.common.table.log.InstantRange.RangeType
+import org.apache.hudi.exception.HoodieException
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression,
GenericInternalRow}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache,
NoopCache, PartitionDirectory}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
import scala.collection.JavaConverters._
@@ -44,8 +47,23 @@ class HoodieCDCFileIndex(override val spark: SparkSession,
spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles,
shouldEmbedFileSlices = true
) with FileIndex {
private val emptyPartitionPath: String = "empty_partition_path";
- val cdcRelation: CDCRelation = CDCRelation.getCDCRelation(spark.sqlContext,
metaClient, options, rangeType)
- val cdcExtractor: HoodieCDCExtractor = cdcRelation.cdcExtractor
+
+ if (!isCDCEnabled(metaClient)) {
+ throw new IllegalArgumentException(s"It isn't a CDC hudi table on
${metaClient.getBasePath}")
+ }
+
+ private val cdcExtractor: HoodieCDCExtractor = new
HoodieCDCExtractor(metaClient,
+ InstantRange.builder()
+ .startInstant(options.getOrElse(DataSourceReadOptions.START_COMMIT.key(),
+ throw new HoodieException(s"CDC Query should provide the valid start
completion time "
+ + s"through the option ${DataSourceReadOptions.START_COMMIT.key()}")
+ ))
+ .endInstant(options.getOrElse(DataSourceReadOptions.END_COMMIT.key(),
+ HoodieCDCFileIndex.getTimestampOfLatestInstant(metaClient)
+ ))
+ .nullableBoundary(true)
+ .rangeType(rangeType).build(),
+ false)
override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
hasPushedDownPartitionPredicates = true
@@ -81,3 +99,35 @@ class HoodieCDCFileIndex(override val spark: SparkSession,
}.toArray
}
}
+
+object HoodieCDCFileIndex {
+
+ /**
+ * CDC Schema For Spark.
+ * Also it's schema when `hoodie.table.cdc.supplemental.logging.mode` is
[[DATA_BEFORE_AFTER]].
+ * Here we use the debezium format.
+ */
+ val FULL_CDC_SPARK_SCHEMA: StructType = {
+ StructType(
+ Seq(
+ StructField(HoodieCDCUtils.CDC_OPERATION_TYPE, StringType),
+ StructField(HoodieCDCUtils.CDC_COMMIT_TIMESTAMP, StringType),
+ StructField(HoodieCDCUtils.CDC_BEFORE_IMAGE, StringType),
+ StructField(HoodieCDCUtils.CDC_AFTER_IMAGE, StringType)
+ )
+ )
+ }
+
+ private def getTimestampOfLatestInstant(metaClient: HoodieTableMetaClient):
String = {
+ val latestInstant = metaClient.getActiveTimeline.lastInstant()
+ if (latestInstant.isPresent) {
+ latestInstant.get().requestedTime
+ } else {
+ throw new HoodieException("No valid instant in Active Timeline.")
+ }
+ }
+
+ def isCDCEnabled(metaClient: HoodieTableMetaClient): Boolean = {
+ metaClient.getTableConfig.isCDCEnabled
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
deleted file mode 100644
index 97b572ed98b2..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++ /dev/null
@@ -1,602 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.cdc
-
-import org.apache.hudi.{AvroConversionUtils, AvroProjection, HoodieFileIndex,
HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState,
HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator,
SparkAdapterSupport}
-import org.apache.hudi.HoodieBaseRelation.BaseFileReader
-import org.apache.hudi.HoodieConversionUtils._
-import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
-import org.apache.hudi.avro.HoodieAvroUtils
-import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
-import org.apache.hudi.common.model._
-import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils}
-import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase._
-import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
-import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
-import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator
-import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.config.HoodiePayloadConfig
-import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
-import org.apache.hudi.storage.StoragePath
-import org.apache.hudi.util.JFunction
-
-import org.apache.avro.Schema
-import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
-import org.apache.hadoop.fs.Path
-import org.apache.spark.{Partition, SerializableWritable, TaskContext}
-import org.apache.spark.rdd.RDD
-import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.avro.HoodieAvroDeserializer
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Projection
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.unsafe.types.UTF8String
-
-import java.io.Closeable
-import java.util.Properties
-import java.util.stream.Collectors
-
-import scala.annotation.tailrec
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/**
- * The split that will be processed by spark task.
- * The [[changes]] should be sorted first.
- */
-case class HoodieCDCFileGroupSplit(changes: Array[HoodieCDCFileSplit])
-
-/**
- * The Spark [[Partition]]'s implementation.
- */
-case class HoodieCDCFileGroupPartition(
- index: Int,
- split: HoodieCDCFileGroupSplit
-) extends Partition
-
-class HoodieCDCRDD(
- spark: SparkSession,
- metaClient: HoodieTableMetaClient,
- parquetReader: PartitionedFile => Iterator[InternalRow],
- originTableSchema: HoodieTableSchema,
- cdcSchema: StructType,
- requiredCdcSchema: StructType,
- @transient changes: Array[HoodieCDCFileGroupSplit])
- extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD {
-
- @transient
- private val hadoopConf = spark.sparkContext.hadoopConfiguration
-
- private val confBroadcast = spark.sparkContext.broadcast(new
SerializableWritable(hadoopConf))
-
- private val cdcSupplementalLoggingMode =
metaClient.getTableConfig.cdcSupplementalLoggingMode
-
- private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty,
metaClient.getTableConfig)
-
- protected val payloadProps: Properties =
metaClient.getTableConfig.getOrderingFieldsStr
- .map[TypedProperties](JFunction.toJavaFunction(preCombineFields =>
- HoodiePayloadConfig.newBuilder
- .withPayloadOrderingFields(preCombineFields)
- .build
- .getProps
- )).orElse(new TypedProperties())
-
- override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
- val cdcPartition = split.asInstanceOf[HoodieCDCFileGroupPartition]
- new CDCFileGroupIterator(cdcPartition.split, metaClient)
- }
-
- override protected def getPartitions: Array[Partition] = {
- changes.zipWithIndex.map{ case (split, index) =>
- HoodieCDCFileGroupPartition(index, split)
- }.toArray
- }
-
- private class CDCFileGroupIterator(
- split: HoodieCDCFileGroupSplit,
- metaClient: HoodieTableMetaClient
- ) extends Iterator[InternalRow] with SparkAdapterSupport with
AvroDeserializerSupport with Closeable {
-
- private lazy val storage = metaClient.getStorage
-
- private lazy val conf = confBroadcast.value.value
-
- private lazy val basePath = metaClient.getBasePath
-
- private lazy val tableConfig = metaClient.getTableConfig
-
- private lazy val populateMetaFields = tableConfig.populateMetaFields()
-
- private lazy val keyGenerator = {
- HoodieSparkKeyGeneratorFactory.createKeyGenerator(tableConfig.getProps())
- }
-
- private lazy val recordKeyField: String = if (populateMetaFields) {
- HoodieRecord.RECORD_KEY_METADATA_FIELD
- } else {
- val keyFields = metaClient.getTableConfig.getRecordKeyFields.get()
- checkState(keyFields.length == 1)
- keyFields.head
- }
-
- private lazy val orderingFields: List[String] =
metaClient.getTableConfig.getOrderingFields.asScala.toList
-
- private lazy val tableState = {
- val metadataConfig = HoodieMetadataConfig.newBuilder()
- .fromProperties(props)
- .build()
- HoodieTableState(
- basePath.toUri.toString,
- Some(split.changes.last.getInstant),
- recordKeyField,
- orderingFields,
- usesVirtualKeys = !populateMetaFields,
- metaClient.getTableConfig.getPayloadClass,
- metadataConfig,
- // TODO support CDC with spark record
- recordMergeImplClasses = List(classOf[HoodieAvroRecordMerger].getName),
- recordMergeStrategyId =
HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID
- )
- }
-
- protected override val avroSchema: Schema = new
Schema.Parser().parse(originTableSchema.avroSchemaStr)
-
- protected override val structTypeSchema: StructType =
originTableSchema.structTypeSchema
-
- private lazy val serializer =
sparkAdapter.createAvroSerializer(originTableSchema.structTypeSchema,
- avroSchema, nullable = false)
-
- private lazy val avroProjection = AvroProjection.create(avroSchema)
-
- private lazy val cdcAvroSchema: Schema =
HoodieCDCUtils.schemaBySupplementalLoggingMode(
- cdcSupplementalLoggingMode,
- HoodieAvroUtils.removeMetadataFields(avroSchema)
- )
-
- private lazy val cdcSparkSchema: StructType =
AvroConversionUtils.convertAvroSchemaToStructType(cdcAvroSchema)
-
- private lazy val sparkPartitionedFileUtils =
sparkAdapter.getSparkPartitionedFileUtils
-
- /**
- * The deserializer used to convert the CDC GenericRecord to Spark
InternalRow.
- */
- private lazy val cdcRecordDeserializer: HoodieAvroDeserializer = {
- sparkAdapter.createAvroDeserializer(cdcAvroSchema, cdcSparkSchema)
- }
-
- private lazy val projection: Projection =
generateUnsafeProjection(cdcSchema, requiredCdcSchema)
-
- // Iterator on cdc file
- private val cdcFileIter = split.changes.iterator
-
- // The instant that is currently being processed
- private var currentInstant: String = _
-
- // The change file that is currently being processed
- private var currentCDCFileSplit: HoodieCDCFileSplit = _
-
- /**
- * Two cases will use this to iterator the records:
- * 1) extract the change data from the base file directly, including
'BASE_FILE_INSERT' and 'BASE_FILE_DELETE'.
- * 2) when the type of cdc file is 'REPLACE_COMMIT',
- * use this to trace the records that are converted from the
'[[beforeImageRecords]]
- */
- private var recordIter: Iterator[InternalRow] = Iterator.empty
-
- /**
- * Only one case where it will be used is that extract the change data
from log files for mor table.
- * At the time, 'logRecordIter' will work with [[beforeImageRecords]] that
keep all the records of the previous file slice.
- */
- private var logRecordIter: Iterator[(String, HoodieRecord[_])] =
Iterator.empty
-
- /**
- * Only one case where it will be used is that extract the change data
from cdc log files.
- */
- private var cdcLogRecordIterator: HoodieCDCLogRecordIterator = _
-
- /**
- * The next record need to be returned when call next().
- */
- protected var recordToLoad: InternalRow = _
-
- /**
- * The list of files to which 'beforeImageRecords' belong.
- * Use it to determine if 'beforeImageRecords' contains all the required
data that extract
- * the change data from the current cdc file.
- */
- private val beforeImageFiles: mutable.ArrayBuffer[String] =
mutable.ArrayBuffer.empty
-
- /**
- * Keep the before-image data. There cases will use this:
- * 1) the cdc infer case is [[LOG_FILE]];
- * 2) the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]]
is 'op_key'.
- */
- private var beforeImageRecords: mutable.Map[String, GenericRecord] =
mutable.Map.empty
-
- /**
- * Keep the after-image data. Only one case will use this:
- * the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is
[[OP_KEY_ONLY]] or [[DATA_BEFORE]].
- */
- private var afterImageRecords: mutable.Map[String, InternalRow] =
mutable.Map.empty
-
- private var internalRowToJsonStringConverter = new
InternalRowToJsonStringConverter(originTableSchema.structTypeSchema)
-
- private def needLoadNextFile: Boolean = {
- !recordIter.hasNext &&
- !logRecordIter.hasNext &&
- (cdcLogRecordIterator == null || !cdcLogRecordIterator.hasNext)
- }
-
- @tailrec final def hasNextInternal: Boolean = {
- if (needLoadNextFile) {
- loadCdcFile()
- }
- if (currentCDCFileSplit == null) {
- false
- } else {
- currentCDCFileSplit.getCdcInferCase match {
- case BASE_FILE_INSERT | BASE_FILE_DELETE | REPLACE_COMMIT =>
- if (recordIter.hasNext && loadNext()) {
- true
- } else {
- hasNextInternal
- }
- case LOG_FILE =>
- if (logRecordIter.hasNext && loadNext()) {
- true
- } else {
- hasNextInternal
- }
- case AS_IS =>
- if (cdcLogRecordIterator.hasNext && loadNext()) {
- true
- } else {
- hasNextInternal
- }
- }
- }
- }
-
- override def hasNext: Boolean = hasNextInternal
-
- override final def next(): InternalRow = {
- projection(recordToLoad)
- }
-
- def loadNext(): Boolean = {
- var loaded = false
- currentCDCFileSplit.getCdcInferCase match {
- case BASE_FILE_INSERT =>
- val originRecord = recordIter.next()
- recordToLoad.update(3, convertRowToJsonString(originRecord))
- loaded = true
- case BASE_FILE_DELETE =>
- val originRecord = recordIter.next()
- recordToLoad.update(2, convertRowToJsonString(originRecord))
- loaded = true
- case LOG_FILE =>
- loaded = loadNextLogRecord()
- case AS_IS =>
- val record = cdcLogRecordIterator.next().asInstanceOf[GenericRecord]
- cdcSupplementalLoggingMode match {
- case `DATA_BEFORE_AFTER` =>
- recordToLoad.update(0,
convertToUTF8String(String.valueOf(record.get(0))))
- val before = record.get(2).asInstanceOf[GenericRecord]
- recordToLoad.update(2, recordToJsonAsUTF8String(before))
- val after = record.get(3).asInstanceOf[GenericRecord]
- recordToLoad.update(3, recordToJsonAsUTF8String(after))
- case `DATA_BEFORE` =>
- val row =
cdcRecordDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
- val op = row.getString(0)
- val recordKey = row.getString(1)
- recordToLoad.update(0, convertToUTF8String(op))
- val before = record.get(2).asInstanceOf[GenericRecord]
- recordToLoad.update(2, recordToJsonAsUTF8String(before))
- parse(op) match {
- case INSERT =>
- recordToLoad.update(3,
convertRowToJsonString(afterImageRecords(recordKey)))
- case UPDATE =>
- recordToLoad.update(3,
convertRowToJsonString(afterImageRecords(recordKey)))
- case _ =>
- recordToLoad.update(3, null)
- }
- case _ =>
- val row =
cdcRecordDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
- val op = row.getString(0)
- val recordKey = row.getString(1)
- recordToLoad.update(0, convertToUTF8String(op))
- parse(op) match {
- case INSERT =>
- recordToLoad.update(2, null)
- recordToLoad.update(3,
convertRowToJsonString(afterImageRecords(recordKey)))
- case UPDATE =>
- recordToLoad.update(2,
recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
- recordToLoad.update(3,
convertRowToJsonString(afterImageRecords(recordKey)))
- case _ =>
- recordToLoad.update(2,
recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
- recordToLoad.update(3, null)
- }
- }
- loaded = true
- case REPLACE_COMMIT =>
- val originRecord = recordIter.next()
- recordToLoad.update(2, convertRowToJsonString(originRecord))
- loaded = true
- }
- loaded
- }
-
- /**
- * Load the next log record, and judge how to convert it to cdc format.
- */
- private def loadNextLogRecord(): Boolean = {
- var loaded = false
- val (key, logRecord) = logRecordIter.next()
- val indexedRecord = getInsertValue(logRecord)
- if (indexedRecord.isEmpty) {
- // it's a deleted record.
- val existingRecordOpt = beforeImageRecords.remove(key)
- if (existingRecordOpt.isEmpty) {
- // no real record is deleted, just ignore.
- logDebug(s"Cannot find any record corresponding to delete key [$key]
in the log record.")
- } else {
- // there is a real record deleted.
- recordToLoad.update(0, CDCRelation.CDC_OPERATION_DELETE)
- recordToLoad.update(2,
recordToJsonAsUTF8String(existingRecordOpt.get))
- recordToLoad.update(3, null)
- loaded = true
- }
- } else {
- val existingRecordOpt = beforeImageRecords.get(key)
- if (existingRecordOpt.isEmpty) {
- // a new record is inserted.
- val insertedRecord =
avroProjection(indexedRecord.get.asInstanceOf[GenericRecord])
- recordToLoad.update(0, CDCRelation.CDC_OPERATION_INSERT)
- recordToLoad.update(2, null)
- recordToLoad.update(3, recordToJsonAsUTF8String(insertedRecord))
- // insert into beforeImageRecords
- beforeImageRecords(key) = insertedRecord
- loaded = true
- } else {
- // a existed record is updated.
- val existingRecord = existingRecordOpt.get
- val merged = merge(existingRecord, logRecord)
- val mergeRecord = avroProjection(merged.asInstanceOf[GenericRecord])
- if (existingRecord != mergeRecord) {
- recordToLoad.update(0, CDCRelation.CDC_OPERATION_UPDATE)
- recordToLoad.update(2, recordToJsonAsUTF8String(existingRecord))
- recordToLoad.update(3, recordToJsonAsUTF8String(mergeRecord))
- // update into beforeImageRecords
- beforeImageRecords(key) = mergeRecord
- loaded = true
- }
- }
- }
- loaded
- }
-
- private def loadCdcFile(): Unit = {
- // reset all the iterator first.
- recordIter = Iterator.empty
- logRecordIter = Iterator.empty
- beforeImageRecords.clear()
- afterImageRecords.clear()
- if (cdcLogRecordIterator != null) {
- cdcLogRecordIterator.close()
- cdcLogRecordIterator = null
- }
-
- if (cdcFileIter.hasNext) {
- val split = cdcFileIter.next()
- currentInstant = split.getInstant
- currentCDCFileSplit = split
- currentCDCFileSplit.getCdcInferCase match {
- case BASE_FILE_INSERT =>
- assert(currentCDCFileSplit.getCdcFiles != null &&
currentCDCFileSplit.getCdcFiles.size() == 1)
- val absCDCPath = new StoragePath(basePath,
currentCDCFileSplit.getCdcFiles.get(0))
- val pathInfo = storage.getPathInfo(absCDCPath)
-
- val pf = sparkPartitionedFileUtils.createPartitionedFile(
- InternalRow.empty, absCDCPath, 0, pathInfo.getLength)
- recordIter = parquetReader(pf)
- case BASE_FILE_DELETE =>
- assert(currentCDCFileSplit.getBeforeFileSlice.isPresent)
- recordIter =
loadFileSlice(currentCDCFileSplit.getBeforeFileSlice.get)
- case LOG_FILE =>
- assert(currentCDCFileSplit.getCdcFiles != null &&
currentCDCFileSplit.getCdcFiles.size() == 1
- && currentCDCFileSplit.getBeforeFileSlice.isPresent)
-
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
- val absLogPath = new StoragePath(basePath,
currentCDCFileSplit.getCdcFiles.get(0))
- val morSplit = HoodieMergeOnReadFileSplit(None, List(new
HoodieLogFile(storage.getPathInfo(absLogPath))))
- val logFileIterator = new LogFileIterator(morSplit,
originTableSchema, originTableSchema, tableState, conf)
- logRecordIter = logFileIterator.logRecordsPairIterator
- case AS_IS =>
- assert(currentCDCFileSplit.getCdcFiles != null &&
!currentCDCFileSplit.getCdcFiles.isEmpty)
- // load beforeFileSlice to beforeImageRecords
- if (currentCDCFileSplit.getBeforeFileSlice.isPresent) {
-
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
- }
- // load afterFileSlice to afterImageRecords
- if (currentCDCFileSplit.getAfterFileSlice.isPresent) {
- val iter =
loadFileSlice(currentCDCFileSplit.getAfterFileSlice.get())
- afterImageRecords = mutable.Map.empty
- iter.foreach { row =>
- val key = getRecordKey(row)
- afterImageRecords.put(key, row.copy())
- }
- }
-
- val cdcLogFiles = currentCDCFileSplit.getCdcFiles.asScala.map {
cdcFile =>
- new HoodieLogFile(storage.getPathInfo(new StoragePath(basePath,
cdcFile)))
- }.toArray
- cdcLogRecordIterator = new HoodieCDCLogRecordIterator(storage,
cdcLogFiles, cdcAvroSchema)
- case REPLACE_COMMIT =>
- if (currentCDCFileSplit.getBeforeFileSlice.isPresent) {
-
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
- }
- recordIter = beforeImageRecords.values.map { record =>
- deserialize(record)
- }.iterator
- beforeImageRecords.clear()
- }
- resetRecordFormat()
- } else {
- currentInstant = null
- currentCDCFileSplit = null
- }
- }
-
- /**
- * Initialize the partial fields of the data to be returned in advance to
speed up.
- */
- private def resetRecordFormat(): Unit = {
- recordToLoad = currentCDCFileSplit.getCdcInferCase match {
- case BASE_FILE_INSERT =>
- InternalRow.fromSeq(Seq(
- CDCRelation.CDC_OPERATION_INSERT,
convertToUTF8String(currentInstant),
- null, null))
- case BASE_FILE_DELETE =>
- InternalRow.fromSeq(Seq(
- CDCRelation.CDC_OPERATION_DELETE,
convertToUTF8String(currentInstant),
- null, null))
- case LOG_FILE =>
- InternalRow.fromSeq(Seq(
- null, convertToUTF8String(currentInstant),
- null, null))
- case AS_IS =>
- InternalRow.fromSeq(Seq(
- null, convertToUTF8String(currentInstant),
- null, null))
- case REPLACE_COMMIT =>
- InternalRow.fromSeq(Seq(
- CDCRelation.CDC_OPERATION_DELETE,
convertToUTF8String(currentInstant),
- null, null))
- }
- }
-
- /**
- * If [[beforeImageFiles]] are the list of file that we want to load
exactly, use this directly.
- * Otherwise we need to re-load what we need.
- */
- private def loadBeforeFileSliceIfNeeded(fileSlice: FileSlice): Unit = {
- val files = List(fileSlice.getBaseFile.get().getPath) ++
- fileSlice.getLogFiles.collect(Collectors.toList[HoodieLogFile]).asScala
- .map(f => f.getPath.toUri.toString).toList
- val same = files.sorted == beforeImageFiles.sorted.toList
- if (!same) {
- // clear up the beforeImageRecords
- beforeImageRecords.clear()
- val iter = loadFileSlice(fileSlice)
- iter.foreach { row =>
- val key = getRecordKey(row)
- // Due to the reuse buffer mechanism of Spark serialization,
- // we have to copy the serialized result if we need to retain its
reference
- beforeImageRecords.put(key, serialize(row, copy = true))
- }
- // reset beforeImageFiles
- beforeImageFiles.clear()
- beforeImageFiles.append(files: _*)
- }
- }
-
- private def loadFileSlice(fileSlice: FileSlice): Iterator[InternalRow] = {
- val baseFileInfo =
storage.getPathInfo(fileSlice.getBaseFile.get().getStoragePath)
- val basePartitionedFile =
sparkPartitionedFileUtils.createPartitionedFile(
- InternalRow.empty,
- baseFileInfo.getPath,
- 0,
- baseFileInfo.getLength
- )
- val logFiles = fileSlice.getLogFiles
- .sorted(HoodieLogFile.getLogFileComparator)
- .collect(Collectors.toList[HoodieLogFile])
- .asScala.toList
- .filterNot(_.getFileName.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
-
- if (logFiles.isEmpty) {
- // no log files, just load the base parquet file
- parquetReader(basePartitionedFile)
- } else {
- // use [[RecordMergingFileIterator]] to load both the base file and
log files
- val morSplit = HoodieMergeOnReadFileSplit(Some(basePartitionedFile),
logFiles)
- new RecordMergingFileIterator(
- morSplit,
- BaseFileReader(parquetReader, originTableSchema.structTypeSchema),
- originTableSchema,
- originTableSchema,
- tableState,
- conf)
- }
- }
-
- /**
- * Convert InternalRow to json string.
- */
- private def convertRowToJsonString(record: InternalRow): UTF8String = {
- internalRowToJsonStringConverter.convert(record)
- }
-
- /**
- * The data of string type is stored in InternalRow using UTF8String type.
- */
- private def convertToUTF8String(str: String): UTF8String = {
- UTF8String.fromString(str)
- }
-
- private def pathToString(p: Path): String = {
- p.toUri.toString
- }
-
- private def serialize(curRowRecord: InternalRow, copy: Boolean = false):
GenericRecord = {
- val record =
serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
- if (copy) {
- GenericData.get().deepCopy(record.getSchema, record)
- } else {
- record
- }
- }
-
- private def recordToJsonAsUTF8String(record: GenericRecord): UTF8String = {
- convertToUTF8String(HoodieCDCUtils.recordToJson(record))
- }
-
- private def getRecordKey(row: InternalRow): String = {
- if (populateMetaFields) {
-
row.getString(structTypeSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD))
- } else {
- this.keyGenerator.getKey(serialize(row)).getRecordKey
- }
- }
-
- private def getInsertValue(
- record: HoodieRecord[_])
- : Option[IndexedRecord] = {
- toScalaOption(record.toIndexedRecord(avroSchema,
payloadProps)).map(_.getData)
- }
-
- private def merge(curAvroRecord: GenericRecord, newRecord:
HoodieRecord[_]): IndexedRecord = {
-
newRecord.getData.asInstanceOf[HoodieRecordPayload[_]].combineAndGetUpdateValue(curAvroRecord,
avroSchema, payloadProps).get()
- }
-
- override def close(): Unit = {}
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala
index 6feadb70333e..ae6740b3017a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala
@@ -18,11 +18,12 @@
package org.apache.spark.sql
-import org.apache.hudi.{HoodieCDCFileIndex, SparkAdapterSupport,
SparkHoodieTableFileIndex}
+import org.apache.hudi.{SparkAdapterSupport, SparkHoodieTableFileIndex}
+import org.apache.hudi.cdc.HoodieCDCFileIndex
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Contains,
EndsWith, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual,
In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, NamedExpression,
Not, Or, StartsWith}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan,
Project}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait,
ParquetFileFormat}
import org.apache.spark.sql.types.{BooleanType, StructType}
@@ -34,7 +35,7 @@ object FileFormatUtilsForFileGroupReader extends
SparkAdapterSupport {
val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with
HoodieFormatTrait]
ff.isProjected = true
val tableSchema = fs.location match {
- case index: HoodieCDCFileIndex => index.cdcRelation.schema
+ case _: HoodieCDCFileIndex => HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA
case index: SparkHoodieTableFileIndex => index.schema
}
val resolvedSchema = logicalRelation.resolve(tableSchema,
fs.sparkSession.sessionState.analyzer.resolver)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
index 1afc7d84bff1..718dc827df53 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex,
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping,
HoodieSparkUtils, HoodieTableSchema, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
import org.apache.hudi.avro.AvroSchemaUtils
-import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation,
HoodieCDCFileGroupSplit}
+import org.apache.hudi.cdc.{CDCFileGroupIterator, HoodieCDCFileGroupSplit,
HoodieCDCFileIndex}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.config.{HoodieMemoryConfig, TypedProperties}
@@ -327,7 +327,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
val fileSplits =
hoodiePartitionCDCFileGroupSliceMapping.getFileSplits().toArray
val cdcFileGroupSplit: HoodieCDCFileGroupSplit =
HoodieCDCFileGroupSplit(fileSplits)
props.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, tableName)
- val cdcSchema = CDCRelation.FULL_CDC_SPARK_SCHEMA
+ val cdcSchema = HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA
new CDCFileGroupIterator(
cdcFileGroupSplit,
metaClient,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
index 87d3e334e8be..60fa89993489 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
@@ -21,11 +21,10 @@ package org.apache.spark.sql.hudi.streaming
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions,
HoodieCopyOnWriteCDCHadoopFsRelationFactory,
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV1,
HoodieMergeOnReadCDCHadoopFsRelationFactory,
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV1, HoodieSparkUtils,
IncrementalRelationV1, MergeOnReadIncrementalRelationV1, SparkAdapterSupport}
import
org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT
-import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.cdc.HoodieCDCFileIndex
import org.apache.hudi.common.config.HoodieReaderConfig
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.{HoodieTableMetaClient,
HoodieTableVersion, TableSchemaResolver}
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils
import org.apache.hudi.common.table.checkpoint.{CheckpointUtils,
StreamerCheckpointV1}
import
org.apache.hudi.common.table.timeline.TimelineUtils.{handleHollowCommitIfNeeded,
HollowCommitHandling}
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling._
@@ -64,7 +63,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,
private lazy val isBootstrappedTable =
metaClient.getTableConfig.getBootstrapBasePath.isPresent
- private val isCDCQuery = CDCRelation.isCDCEnabled(metaClient) &&
+ private val isCDCQuery = HoodieCDCFileIndex.isCDCEnabled(metaClient) &&
parameters.get(DataSourceReadOptions.QUERY_TYPE.key).contains(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
&&
parameters.get(DataSourceReadOptions.INCREMENTAL_FORMAT.key).contains(DataSourceReadOptions.INCREMENTAL_FORMAT_CDC_VAL)
@@ -100,7 +99,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,
override def schema: StructType = {
if (isCDCQuery) {
- CDCRelation.FULL_CDC_SPARK_SCHEMA
+ HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA
} else {
schemaOption.getOrElse {
val schemaUtil = new TableSchemaResolver(metaClient)
@@ -154,20 +153,14 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,
DataSourceReadOptions.START_COMMIT.key()->
startCommitTime(startOffset),
DataSourceReadOptions.END_COMMIT.key() -> endOffset.offsetCommitTime
)
- if (enableFileGroupReader) {
- val relation = if (tableType == HoodieTableType.COPY_ON_WRITE) {
- new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
- sqlContext, metaClient, parameters ++ cdcOptions, schemaOption,
isBootstrappedTable).build()
- } else {
- new HoodieMergeOnReadCDCHadoopFsRelationFactory(
- sqlContext, metaClient, parameters ++ cdcOptions, schemaOption,
isBootstrappedTable).build()
- }
- sparkAdapter.createStreamingDataFrame(sqlContext, relation,
CDCRelation.FULL_CDC_SPARK_SCHEMA)
+ val relation = if (tableType == HoodieTableType.COPY_ON_WRITE) {
+ new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters ++ cdcOptions, schemaOption,
isBootstrappedTable).build()
} else {
- val rdd = CDCRelation.getCDCRelation(sqlContext, metaClient,
cdcOptions)
- .buildScan0(HoodieCDCUtils.CDC_COLUMNS, Array.empty)
-
- sparkAdapter.internalCreateDataFrame(sqlContext.sparkSession, rdd,
CDCRelation.FULL_CDC_SPARK_SCHEMA, isStreaming = true)}
+ new HoodieMergeOnReadCDCHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters ++ cdcOptions, schemaOption,
isBootstrappedTable).build()
+ }
+ sparkAdapter.createStreamingDataFrame(sqlContext, relation,
HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA)
} else {
// Consume the data between (startCommitTime, endCommitTime]
val incParams = parameters ++ Map(
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
index 3eebc4639179..c41657d74120 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
@@ -18,11 +18,10 @@
package org.apache.spark.sql.hudi.streaming
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions,
HoodieCopyOnWriteCDCHadoopFsRelationFactory,
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2,
HoodieMergeOnReadCDCHadoopFsRelationFactory,
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV2, HoodieSparkUtils,
IncrementalRelationV2, MergeOnReadIncrementalRelationV2, SparkAdapterSupport}
-import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.cdc.HoodieCDCFileIndex
import org.apache.hudi.common.config.HoodieReaderConfig
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.{HoodieTableMetaClient,
HoodieTableVersion, TableSchemaResolver}
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils
import org.apache.hudi.common.table.checkpoint.{CheckpointUtils,
StreamerCheckpointV2}
import org.apache.hudi.common.table.log.InstantRange.RangeType
import org.apache.hudi.util.SparkConfigUtils
@@ -59,7 +58,7 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
private lazy val enableFileGroupReader = SparkConfigUtils
.getStringWithAltKeys(parameters,
HoodieReaderConfig.FILE_GROUP_READER_ENABLED).toBoolean
- private val isCDCQuery = CDCRelation.isCDCEnabled(metaClient) &&
+ private val isCDCQuery = HoodieCDCFileIndex.isCDCEnabled(metaClient) &&
parameters.get(DataSourceReadOptions.QUERY_TYPE.key).contains(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
&&
parameters.get(DataSourceReadOptions.INCREMENTAL_FORMAT.key).contains(DataSourceReadOptions.INCREMENTAL_FORMAT_CDC_VAL)
@@ -82,7 +81,7 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
override def schema: StructType = {
if (isCDCQuery) {
- CDCRelation.FULL_CDC_SPARK_SCHEMA
+ HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA
} else {
schemaOption.getOrElse {
val schemaUtil = new TableSchemaResolver(metaClient)
@@ -130,20 +129,14 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
DataSourceReadOptions.START_COMMIT.key() -> startCompletionTime,
DataSourceReadOptions.END_COMMIT.key() -> endOffset.offsetCommitTime
)
- if (enableFileGroupReader) {
- val relation = if (tableType == HoodieTableType.COPY_ON_WRITE) {
- new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
- sqlContext, metaClient, parameters ++ cdcOptions, schemaOption,
isBootstrappedTable, rangeType).build()
- } else {
- new HoodieMergeOnReadCDCHadoopFsRelationFactory(
- sqlContext, metaClient, parameters ++ cdcOptions, schemaOption,
isBootstrappedTable, rangeType).build()
- }
- sparkAdapter.createStreamingDataFrame(sqlContext, relation,
CDCRelation.FULL_CDC_SPARK_SCHEMA)
+ val relation = if (tableType == HoodieTableType.COPY_ON_WRITE) {
+ new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters ++ cdcOptions, schemaOption,
isBootstrappedTable, rangeType).build()
} else {
- val rdd = CDCRelation.getCDCRelation(sqlContext, metaClient,
cdcOptions, rangeType)
- .buildScan0(HoodieCDCUtils.CDC_COLUMNS, Array.empty)
-
- sparkAdapter.internalCreateDataFrame(sqlContext.sparkSession, rdd,
CDCRelation.FULL_CDC_SPARK_SCHEMA, isStreaming = true)}
+ new HoodieMergeOnReadCDCHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters ++ cdcOptions, schemaOption,
isBootstrappedTable, rangeType).build()
+ }
+ sparkAdapter.createStreamingDataFrame(sqlContext, relation,
HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA)
} else {
// Consume the data between (startCommitTime, endCommitTime]
val incParams = parameters ++ Map(