This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new dcb54fc7fdd [HUDI-6790] Support incremental/CDC queries using
HadoopFsRelation (#9888)
dcb54fc7fdd is described below
commit dcb54fc7fdd446051718273d4a2ff0a70f59054c
Author: Lin Liu <[email protected]>
AuthorDate: Tue Nov 7 11:42:10 2023 -0800
[HUDI-6790] Support incremental/CDC queries using HadoopFsRelation (#9888)
---
.../main/scala/org/apache/hudi/DefaultSource.scala | 73 ++-
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 3 +
.../scala/org/apache/hudi/HoodieCDCFileIndex.scala | 75 +++
.../scala/org/apache/hudi/HoodieFileIndex.scala | 11 +-
.../hudi/HoodieHadoopFsRelationFactory.scala | 377 ++++++++++++++
.../apache/hudi/HoodieIncrementalFileIndex.scala | 120 +++++
.../hudi/HoodiePartitionCDCFileGroupMapping.scala | 39 ++
.../hudi/HoodiePartitionFileSliceMapping.scala | 37 ++
.../org/apache/hudi/HoodiePartitionValues.scala | 107 ++++
.../apache/hudi/HoodieSparkFileFormatUtils.scala | 13 +-
.../hudi/MergeOnReadIncrementalRelation.scala | 34 +-
.../apache/hudi/PartitionFileSliceMapping.scala | 76 ---
.../org/apache/hudi/cdc/CDCFileGroupIterator.scala | 558 +++++++++++++++++++++
.../datasources/HoodieMultipleBaseFileFormat.scala | 25 +-
...odieFileGroupReaderBasedParquetFileFormat.scala | 80 ++-
.../parquet/NewHoodieParquetFileFormat.scala | 25 +-
16 files changed, 1505 insertions(+), 148 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 10b77d4b91c..7bdaa9609ef 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -235,13 +235,20 @@ object DefaultSource {
Option(schema)
}
+ val useNewPaquetFileFormat = parameters.getOrElse(
+ USE_NEW_HUDI_PARQUET_FILE_FORMAT.key,
USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean
if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants()
== 0) {
new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters,
Some(schema)))
} else if (isCdcQuery) {
- CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
+ if (useNewPaquetFileFormat) {
+ new HoodieMergeOnReadCDCHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
+ } else {
+ CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
+ }
} else {
lazy val fileFormatUtils = if ((isMultipleBaseFileFormatsEnabled &&
!isBootstrappedTable)
- || (parameters.getOrElse(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key,
USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean
+ || (useNewPaquetFileFormat
&& (globPaths == null || globPaths.isEmpty)
&& parameters.getOrElse(REALTIME_MERGE.key(),
REALTIME_MERGE.defaultValue())
.equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL))) {
@@ -262,33 +269,67 @@ object DefaultSource {
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
- resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema,
metaClient, parameters)
+ if (fileFormatUtils.isDefined) {
+ new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
+ } else {
+ resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema,
metaClient, parameters)
+ }
- case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
- new IncrementalRelation(sqlContext, parameters, userSchema,
metaClient)
+ case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, false) =>
+ if (fileFormatUtils.isDefined) {
+ new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
+ } else {
+ new IncrementalRelation(sqlContext, parameters, userSchema,
metaClient)
+ }
- case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
- if (fileFormatUtils.isEmpty) {
- new MergeOnReadSnapshotRelation(sqlContext, parameters,
metaClient, globPaths, userSchema)
+ case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, true) =>
+ if (fileFormatUtils.isDefined) {
+ new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters, userSchema, isBootstrap =
true).build()
} else {
- fileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap
= false)
+ new IncrementalRelation(sqlContext, parameters, userSchema,
metaClient)
}
- case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
- new MergeOnReadIncrementalRelation(sqlContext, parameters,
metaClient, userSchema)
+ case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
+ if (fileFormatUtils.isDefined) {
+ new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
+ } else {
+ new MergeOnReadSnapshotRelation(sqlContext, parameters,
metaClient, globPaths, userSchema)
+ }
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
- if (fileFormatUtils.isEmpty) {
+ if (fileFormatUtils.isDefined) {
+ new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
+ } else {
new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths,
metaClient, parameters)
+ }
+
+ case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, true) =>
+ if (fileFormatUtils.isDefined) {
+ new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters, userSchema, isBootstrap =
true).build()
} else {
- fileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap
= true)
+ MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient,
userSchema)
+ }
+
+ case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, false) =>
+ if (fileFormatUtils.isDefined) {
+ new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters, userSchema, isBootstrap =
false).build()
+ } else {
+ MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient,
userSchema)
}
case (_, _, true) =>
- if (fileFormatUtils.isEmpty) {
- resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema,
metaClient, parameters)
+ if (fileFormatUtils.isDefined) {
+ new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
+ sqlContext, metaClient, parameters, userSchema, isBootstrap =
true).build()
} else {
- fileFormatUtils.get.getHadoopFsRelation(isMOR = false, isBootstrap
= true)
+ resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema,
metaClient, parameters)
}
case (_, _, _) =>
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index c791e8417ca..78c5cc4ca47 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -480,6 +480,9 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
protected def getPartitionColumnsAsInternalRow(file: FileStatus):
InternalRow =
getPartitionColumnsAsInternalRowInternal(file, metaClient.getBasePathV2,
shouldExtractPartitionValuesFromPartitionPath)
+ protected def getPartitionColumnValuesAsInternalRow(file: FileStatus):
InternalRow =
+ getPartitionColumnsAsInternalRowInternal(file, metaClient.getBasePathV2,
extractPartitionValuesFromPartitionPath = true)
+
protected def getPartitionColumnsAsInternalRowInternal(file: FileStatus,
basePath: Path,
extractPartitionValuesFromPartitionPath: Boolean): InternalRow = {
if (extractPartitionValuesFromPartitionPath) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
new file mode 100644
index 00000000000..cb46b19b88b
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.common.model.HoodieFileGroupId
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.cdc.HoodieCDCExtractor
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression,
GenericInternalRow}
+import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache,
NoopCache, PartitionDirectory}
+import org.apache.spark.sql.types.StructType
+
+import scala.jdk.CollectionConverters.{asScalaBufferConverter,
mapAsScalaMapConverter}
+
+class HoodieCDCFileIndex (override val spark: SparkSession,
+ override val metaClient: HoodieTableMetaClient,
+ override val schemaSpec: Option[StructType],
+ override val options: Map[String, String],
+ @transient override val fileStatusCache:
FileStatusCache = NoopCache,
+ override val includeLogFiles: Boolean)
+ extends HoodieIncrementalFileIndex(
+ spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles
+ ) with FileIndex {
+ val cdcRelation: CDCRelation = CDCRelation.getCDCRelation(spark.sqlContext,
metaClient, options)
+ val cdcExtractor: HoodieCDCExtractor = cdcRelation.cdcExtractor
+
+ override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
+ val partitionToFileGroups =
cdcExtractor.extractCDCFileSplits().asScala.groupBy(_._1.getPartitionPath).toSeq
+ partitionToFileGroups.map {
+ case (partitionPath, fileGroups) =>
+ val fileGroupIds: List[FileStatus] = fileGroups.map { fileGroup => {
+ // We create a fake FileStatus to wrap the information of
HoodieFileGroupId, which are used
+ // later to retrieve the corresponding CDC file group splits.
+ val fileGroupId: HoodieFileGroupId = fileGroup._1
+ new FileStatus(0, true, 0, 0, 0,
+ 0, null, "", "", null,
+ new Path(fileGroupId.getPartitionPath, fileGroupId.getFileId))
+ }}.toList
+ val partitionValues: InternalRow = new
GenericInternalRow(doParsePartitionColumnValues(
+ metaClient.getTableConfig.getPartitionFields.get(),
partitionPath).asInstanceOf[Array[Any]])
+ PartitionDirectory(
+ new HoodiePartitionCDCFileGroupMapping(
+ partitionValues, fileGroups.map(kv => kv._1 ->
kv._2.asScala.toList).toMap),
+ fileGroupIds
+ )
+ }
+ }
+
+ override def inputFiles: Array[String] = {
+ cdcExtractor.extractCDCFileSplits().asScala.map { fileGroupSplit =>
+ val fileGroupId = fileGroupSplit._1
+ new Path(fileGroupId.getPartitionPath, fileGroupId.getFileId).toString
+ }.toArray
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 28e1b54473e..aaefd715fa3 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -76,7 +76,8 @@ case class HoodieFileIndex(spark: SparkSession,
schemaSpec: Option[StructType],
options: Map[String, String],
@transient fileStatusCache: FileStatusCache =
NoopCache,
- includeLogFiles: Boolean = false)
+ includeLogFiles: Boolean = false,
+ shouldEmbedFileSlices: Boolean = false)
extends SparkHoodieTableFileIndex(
spark = spark,
metaClient = metaClient,
@@ -88,7 +89,7 @@ case class HoodieFileIndex(spark: SparkSession,
)
with FileIndex {
- @transient private var hasPushedDownPartitionPredicates: Boolean = false
+ @transient protected var hasPushedDownPartitionPredicates: Boolean = false
/**
* NOTE: [[ColumnStatsIndexSupport]] is a transient state, since it's only
relevant while logical plan
@@ -110,8 +111,6 @@ case class HoodieFileIndex(spark: SparkSession,
override def rootPaths: Seq[Path] = getQueryPaths.asScala
- var shouldEmbedFileSlices: Boolean = false
-
/**
* Returns the FileStatus for all the base files (excluding log files). This
should be used only for
* cases where Spark directly fetches the list of files via HoodieFileIndex
or for read optimized query logic
@@ -168,7 +167,7 @@ case class HoodieFileIndex(spark: SparkSession,
|| (f.getBaseFile.isPresent &&
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId ->
f) }
if (c.nonEmpty) {
- PartitionDirectory(new
PartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values), c),
baseFileStatusesAndLogFileOnly)
+ PartitionDirectory(new
HoodiePartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values),
c), baseFileStatusesAndLogFileOnly)
} else {
PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values),
baseFileStatusesAndLogFileOnly)
}
@@ -293,7 +292,7 @@ case class HoodieFileIndex(spark: SparkSession,
* In the fast bootstrap read code path, it gets the file status for the
bootstrap base file instead of
* skeleton file. Returns file status for the base file if available.
*/
- private def getBaseFileStatus(baseFileOpt: Option[HoodieBaseFile]):
Option[FileStatus] = {
+ protected def getBaseFileStatus(baseFileOpt: Option[HoodieBaseFile]):
Option[FileStatus] = {
baseFileOpt.map(baseFile => {
if (shouldFastBootstrap) {
if (baseFile.getBootstrapBaseFile.isPresent) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
new file mode 100644
index 00000000000..7a3ea7483fd
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema,
isSchemaEvolutionEnabledOnRead}
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.HoodieFileIndex.getConfigProperties
+import
org.apache.hudi.common.config.HoodieMetadataConfig.{DEFAULT_METADATA_ENABLE_FOR_READERS,
ENABLE}
+import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig,
HoodieReaderConfig, TypedProperties}
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
+import org.apache.hudi.metadata.HoodieTableMetadataUtil
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import
org.apache.spark.sql.execution.datasources.parquet.{HoodieFileGroupReaderBasedParquetFileFormat,
NewHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex,
FileStatusCache, HadoopFsRelation, HoodieMultipleBaseFileFormat}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+trait HoodieHadoopFsRelationFactory {
+ def build(): HadoopFsRelation
+ def buildFileIndex(): FileIndex
+ def buildFileFormat(): FileFormat
+ def buildPartitionSchema(): StructType
+ def buildDataSchema(): StructType
+ def buildBucketSpec(): Option[BucketSpec]
+ def buildOptions(): Map[String, String]
+}
+
+abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext,
+ val metaClient:
HoodieTableMetaClient,
+ val options: Map[String,
String],
+ val schemaSpec:
Option[StructType]
+ ) extends SparkAdapterSupport
with HoodieHadoopFsRelationFactory {
+ protected lazy val sparkSession: SparkSession = sqlContext.sparkSession
+ protected lazy val optParams: Map[String, String] = options.filter(kv =>
!kv._1.equals(DATA_QUERIES_ONLY.key()))
+ protected lazy val hadoopConfig: Configuration = new
Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ protected lazy val jobConf = new JobConf(hadoopConfig)
+
+ protected lazy val resolver: Resolver =
sparkSession.sessionState.analyzer.resolver
+ protected lazy val metaFieldNames: Set[String] =
HoodieRecord.HOODIE_META_COLUMNS.asScala.toSet
+
+ protected lazy val tableName: String = metaClient.getTableConfig.getTableName
+ protected lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig
+ protected lazy val basePath: Path = metaClient.getBasePathV2
+ protected lazy val partitionColumns: Array[String] =
tableConfig.getPartitionFields.orElse(Array.empty)
+
+ protected lazy val (tableAvroSchema: Schema, internalSchemaOpt:
Option[InternalSchema]) = {
+ val schemaResolver = new TableSchemaResolver(metaClient)
+ val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(optParams,
sparkSession)) {
+ None
+ } else {
+ Try {
+
specifiedQueryTimestamp.map(schemaResolver.getTableInternalSchemaFromCommitMetadata)
+ .getOrElse(schemaResolver.getTableInternalSchemaFromCommitMetadata)
+ } match {
+ case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt)
+ case Failure(e) =>
+ None
+ }
+ }
+
+ val (name, namespace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
+ val avroSchema = internalSchemaOpt.map { is =>
+ AvroInternalSchemaConverter.convert(is, namespace + "." + name)
+ } orElse {
+ specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
+ } orElse {
+ schemaSpec.map(s => convertToAvroSchema(s, tableName))
+ } getOrElse {
+ Try(schemaResolver.getTableAvroSchema) match {
+ case Success(schema) => schema
+ case Failure(e) =>
+ throw new HoodieSchemaException("Failed to fetch schema from the
table")
+ }
+ }
+
+ (avroSchema, internalSchemaOpt)
+ }
+
+ protected lazy val tableStructSchema: StructType = {
+ val converted =
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+ val metaFieldMetadata = sparkAdapter.createCatalystMetadataForMetaField
+
+ // NOTE: Here we annotate meta-fields with corresponding metadata such
that Spark (>= 3.2)
+ // is able to recognize such fields as meta-fields
+ StructType(converted.map { field =>
+ if (metaFieldNames.exists(metaFieldName => resolver(metaFieldName,
field.name))) {
+ field.copy(metadata = metaFieldMetadata)
+ } else {
+ field
+ }
+ })
+ }
+
+ protected lazy val preCombineFieldOpt: Option[String] =
+ Option(tableConfig.getPreCombineField)
+ .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key))
match {
+ // NOTE: This is required to compensate for cases when empty string is
used to stub
+ // property value to avoid it being set with the default value
+ // TODO(HUDI-3456) cleanup
+ case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
+ case _ => None
+ }
+
+ protected lazy val recordKeyField: String =
+ if (tableConfig.populateMetaFields()) {
+ HoodieRecord.RECORD_KEY_METADATA_FIELD
+ } else {
+ val keyFields = tableConfig.getRecordKeyFields.get()
+ checkState(keyFields.length == 1)
+ keyFields.head
+ }
+
+ protected lazy val specifiedQueryTimestamp: Option[String] =
+ optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
+ .map(HoodieSqlCommonUtils.formatQueryInstant)
+
+ protected val mergeType: String =
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
+ DataSourceReadOptions.REALTIME_MERGE.defaultValue)
+ protected val recordMergerImpls =
ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList
+ protected val recordMergerStrategy =
getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY,
+ Option(metaClient.getTableConfig.getRecordMergerStrategy))
+
+ protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
+ // Controls whether partition columns (which are the source for the
partition path values) should
+ // be omitted from persistence in the data files. On the read path it
affects whether partition values (values
+ // of partition columns) will be read from the data file or extracted from
partition path
+ val shouldOmitPartitionColumns =
metaClient.getTableConfig.shouldDropPartitionColumns &&
partitionColumns.nonEmpty
+ val shouldExtractPartitionValueFromPath =
+
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
+
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
+ val shouldUseBootstrapFastRead =
optParams.getOrElse(DATA_QUERIES_ONLY.key(), "false").toBoolean
+
+ shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath ||
shouldUseBootstrapFastRead
+ }
+
+ protected lazy val mandatoryFieldsForMerging: Seq[String] =
+ Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
+
+ protected lazy val fileGroupReaderEnabled: Boolean =
checkIfAConfigurationEnabled(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
+
+ protected lazy val shouldUseRecordPosition: Boolean =
checkIfAConfigurationEnabled(HoodieWriteConfig.WRITE_RECORD_POSITIONS)
+
+ protected def queryTimestamp: Option[String] =
+
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp))
+
+ protected def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined
+
+ protected def timeline: HoodieTimeline =
+ // NOTE: We're including compaction here since it's not considering a
"commit" operation
+ metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants
+
+ protected def getConfigValue(config: ConfigProperty[String],
+ defaultValueOption: Option[String] =
Option.empty): String = {
+ optParams.getOrElse(config.key(),
+ sqlContext.getConf(config.key(),
defaultValueOption.getOrElse(config.defaultValue())))
+ }
+
+ protected def checkIfAConfigurationEnabled(config:
ConfigProperty[java.lang.Boolean],
+ defaultValueOption: Option[String]
= Option.empty): Boolean = {
+ optParams.getOrElse(config.key(),
+ sqlContext.getConf(config.key(),
defaultValueOption.getOrElse(String.valueOf(config.defaultValue())))).toBoolean
+ }
+}
+
+class HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val
sqlContext: SQLContext,
+ override val
metaClient: HoodieTableMetaClient,
+ override val options:
Map[String, String],
+ override val
schemaSpec: Option[StructType],
+ isBootstrap: Boolean)
+ extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options,
schemaSpec) {
+
+ val fileIndex: HoodieFileIndex = new HoodieFileIndex(
+ sparkSession,
+ metaClient,
+ Some(tableStructSchema),
+ optParams,
+ FileStatusCache.getOrCreate(sparkSession),
+ includeLogFiles = true,
+ shouldEmbedFileSlices = true)
+
+ val configProperties: TypedProperties = getConfigProperties(sparkSession,
options)
+ val metadataConfig: HoodieMetadataConfig = HoodieMetadataConfig.newBuilder
+ .fromProperties(configProperties)
+ .enable(configProperties.getBoolean(ENABLE.key,
DEFAULT_METADATA_ENABLE_FOR_READERS)
+ && HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient)).build
+
+ val tableState: HoodieTableState = // Subset of the state of table's
configuration as of at the time of the query
+ HoodieTableState(
+ tablePath = basePath.toString,
+ latestCommitTimestamp = queryTimestamp,
+ recordKeyField = recordKeyField,
+ preCombineFieldOpt = preCombineFieldOpt,
+ usesVirtualKeys = !tableConfig.populateMetaFields(),
+ recordPayloadClassName = tableConfig.getPayloadClass,
+ metadataConfig = metadataConfig,
+ recordMergerImpls = recordMergerImpls,
+ recordMergerStrategy = recordMergerStrategy
+ )
+
+ val mandatoryFields: Seq[String] = mandatoryFieldsForMerging
+ val fileGroupReaderBasedFileFormat = new
HoodieFileGroupReaderBasedParquetFileFormat(
+ tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString,
internalSchemaOpt),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+ true, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
+
+ val newHoodieParquetFileFormat = new
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+ sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, true,
isBootstrap, false, Seq.empty)
+
+ val multipleBaseFileFormat = new
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
+ sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, true,
false, Seq.empty)
+
+ override def buildFileIndex(): FileIndex = fileIndex
+
+ override def buildFileFormat(): FileFormat = {
+ if (fileGroupReaderEnabled) {
+ fileGroupReaderBasedFileFormat
+ } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled &&
!isBootstrap) {
+ multipleBaseFileFormat
+ } else {
+ newHoodieParquetFileFormat
+ }
+ }
+
+ override def buildDataSchema(): StructType = fileIndex.dataSchema
+
+ override def buildPartitionSchema(): StructType = fileIndex.partitionSchema
+
+ override def buildBucketSpec(): Option[BucketSpec] = None
+
+ override def buildOptions(): Map[String, String] = optParams
+ override def build(): HadoopFsRelation = {
+ HadoopFsRelation(
+ location = buildFileIndex(),
+ partitionSchema = buildPartitionSchema(),
+ dataSchema = buildDataSchema(),
+ bucketSpec = buildBucketSpec(),
+ fileFormat = buildFileFormat(),
+ options = buildOptions())(sparkSession)
+ }
+}
+
+class HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val
sqlContext: SQLContext,
+ override val
metaClient: HoodieTableMetaClient,
+ override val
options: Map[String, String],
+ override val
schemaSpec: Option[StructType],
+ isBootstrap: Boolean)
+ extends HoodieMergeOnReadSnapshotHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
+
+ override val fileIndex = new HoodieIncrementalFileIndex(
+ sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), true)
+
+ override val fileGroupReaderBasedFileFormat = new
HoodieFileGroupReaderBasedParquetFileFormat(
+ tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString,
internalSchemaOpt),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+ true, isBootstrap, true, shouldUseRecordPosition,
fileIndex.getRequiredFilters)
+
+ override val newHoodieParquetFileFormat = new
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+ sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+ true, isBootstrap, true, fileIndex.getRequiredFilters)
+
+ override val multipleBaseFileFormat = new
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
+ sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+ true, true, fileIndex.getRequiredFilters)
+}
+
+class HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val
sqlContext: SQLContext,
+ override val
metaClient: HoodieTableMetaClient,
+ override val options:
Map[String, String],
+ override val
schemaSpec: Option[StructType],
+ isBootstrap: Boolean)
+ extends HoodieMergeOnReadSnapshotHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
+ override val fileIndex: HoodieFileIndex = HoodieFileIndex(
+ sparkSession,
+ metaClient,
+ Some(tableStructSchema),
+ optParams,
+ FileStatusCache.getOrCreate(sparkSession),
+ shouldEmbedFileSlices = true)
+
+ override val fileGroupReaderBasedFileFormat = new
HoodieFileGroupReaderBasedParquetFileFormat(
+ tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString,
internalSchemaOpt),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+ false, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
+
+ override val newHoodieParquetFileFormat = new
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+ sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, false,
isBootstrap, false, Seq.empty)
+
+ override val multipleBaseFileFormat = new
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
+ sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, false,
false, Seq.empty)
+}
+
+class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val
sqlContext: SQLContext,
+ override val
metaClient: HoodieTableMetaClient,
+ override val
options: Map[String, String],
+ override val
schemaSpec: Option[StructType],
+ isBootstrap: Boolean)
+ extends HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
+
+ override val fileIndex = new HoodieIncrementalFileIndex(
+ sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), false)
+
+ override val fileGroupReaderBasedFileFormat = new
HoodieFileGroupReaderBasedParquetFileFormat(
+ tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString,
internalSchemaOpt),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+ false, isBootstrap, true, shouldUseRecordPosition,
fileIndex.getRequiredFilters)
+
+ override val newHoodieParquetFileFormat = new
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+ sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+ false, isBootstrap, true, fileIndex.getRequiredFilters)
+
+ override val multipleBaseFileFormat = new
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
+ sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+ false, true, fileIndex.getRequiredFilters)
+}
+
+class HoodieMergeOnReadCDCHadoopFsRelationFactory(override val sqlContext:
SQLContext,
+ override val metaClient:
HoodieTableMetaClient,
+ override val options:
Map[String, String],
+ override val schemaSpec:
Option[StructType],
+ isBootstrap: Boolean)
+ extends HoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
+ override val fileIndex = new HoodieCDCFileIndex(
+ sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), true)
+}
+
+class HoodieCopyOnWriteCDCHadoopFsRelationFactory(override val sqlContext:
SQLContext,
+ override val metaClient:
HoodieTableMetaClient,
+ override val options:
Map[String, String],
+ override val schemaSpec:
Option[StructType],
+ isBootstrap: Boolean)
+ extends HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
+ override val fileIndex = new HoodieCDCFileIndex(
+ sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), false)
+}
+
+
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
new file mode 100644
index 00000000000..b977c51ab67
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.util.JFunction
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache,
NoopCache, PartitionDirectory}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters.asScalaBufferConverter
+
+class HoodieIncrementalFileIndex(override val spark: SparkSession,
+ override val metaClient:
HoodieTableMetaClient,
+ override val schemaSpec: Option[StructType],
+ override val options: Map[String, String],
+ @transient override val fileStatusCache:
FileStatusCache = NoopCache,
+ override val includeLogFiles: Boolean)
+ extends HoodieFileIndex(
+ spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles
+ ) with FileIndex {
+ val mergeOnReadIncrementalRelation: MergeOnReadIncrementalRelation =
MergeOnReadIncrementalRelation(
+ spark.sqlContext, options, metaClient, schemaSpec, schemaSpec)
+
+ override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
+ val fileSlices =
mergeOnReadIncrementalRelation.listFileSplits(partitionFilters, dataFilters)
+ if (fileSlices.isEmpty) {
+ Seq.empty
+ }
+
+ val prunedPartitionsAndFilteredFileSlices = fileSlices.map {
+ case (partitionValues, fileSlices) =>
+ if (shouldEmbedFileSlices) {
+ val baseFileStatusesAndLogFileOnly: Seq[FileStatus] =
fileSlices.map(slice => {
+ if (slice.getBaseFile.isPresent) {
+ slice.getBaseFile.get().getFileStatus
+ } else if (slice.getLogFiles.findAny().isPresent) {
+ slice.getLogFiles.findAny().get().getFileStatus
+ } else {
+ null
+ }
+ }).filter(slice => slice != null)
+ val c = fileSlices.filter(f => f.getLogFiles.findAny().isPresent
+ || (f.getBaseFile.isPresent &&
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
+ foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId ->
f) }
+ if (c.nonEmpty) {
+ PartitionDirectory(new
HoodiePartitionFileSliceMapping(partitionValues, c),
baseFileStatusesAndLogFileOnly)
+ } else {
+ PartitionDirectory(partitionValues, baseFileStatusesAndLogFileOnly)
+ }
+ } else {
+ val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
+ val baseFileStatusOpt =
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+ val logFilesStatus = if (includeLogFiles) {
+
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile,
FileStatus](lf => lf.getFileStatus))
+ } else {
+ java.util.stream.Stream.empty()
+ }
+ val files =
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
+ baseFileStatusOpt.foreach(f => files.append(f))
+ files
+ })
+ PartitionDirectory(partitionValues, allCandidateFiles)
+ }
+ }.toSeq
+
+ hasPushedDownPartitionPredicates = true
+ if (shouldReadAsPartitionedTable()) {
+ prunedPartitionsAndFilteredFileSlices
+ } else if (shouldEmbedFileSlices) {
+ assert(partitionSchema.isEmpty)
+ prunedPartitionsAndFilteredFileSlices
+ } else {
+ Seq(PartitionDirectory(InternalRow.empty,
prunedPartitionsAndFilteredFileSlices.flatMap(_.files)))
+ }
+ }
+
+ override def inputFiles: Array[String] = {
+ val fileSlices = mergeOnReadIncrementalRelation.listFileSplits(Seq.empty,
Seq.empty)
+ if (fileSlices.isEmpty) {
+ Array.empty
+ }
+ fileSlices.values.flatten.flatMap(fs => {
+ val baseFileStatusOpt =
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+ val logFilesStatus = if (includeLogFiles) {
+ fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile,
FileStatus](lf => lf.getFileStatus))
+ } else {
+ java.util.stream.Stream.empty()
+ }
+ val files = logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
+ baseFileStatusOpt.foreach(f => files.append(f))
+ files
+ }).map(fileStatus => fileStatus.getPath.toString).toArray
+ }
+
+ def getRequiredFilters: Seq[Filter] = {
+ mergeOnReadIncrementalRelation.getRequiredFilters
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionCDCFileGroupMapping.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionCDCFileGroupMapping.scala
new file mode 100644
index 00000000000..bd052c086ff
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionCDCFileGroupMapping.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.model.HoodieFileGroupId
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+import java.util
+
+class HoodiePartitionCDCFileGroupMapping(partitionValues: InternalRow,
+ fileGroups: Map[HoodieFileGroupId,
List[HoodieCDCFileSplit]]
+ )
+ extends HoodiePartitionValues(partitionValues) {
+
+ def getFileSplitsFor(fileGroupId: HoodieFileGroupId):
Option[List[HoodieCDCFileSplit]] = {
+ fileGroups.get(fileGroupId)
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionFileSliceMapping.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionFileSliceMapping.scala
new file mode 100644
index 00000000000..4121a41b8d8
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionFileSliceMapping.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.model.FileSlice
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+class HoodiePartitionFileSliceMapping(values: InternalRow,
+ slices: Map[String, FileSlice])
+ extends HoodiePartitionValues(values) {
+
+ def getSlice(fileId: String): Option[FileSlice] = {
+ slices.get(fileId)
+ }
+
+ def getPartitionValues: InternalRow = values
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionValues.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionValues.scala
new file mode 100644
index 00000000000..3587cc6bd26
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionValues.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+case class HoodiePartitionValues(values: InternalRow) extends InternalRow {
+ override def numFields: Int = {
+ values.numFields
+ }
+
+ override def setNullAt(i: Int): Unit = {
+ values.setNullAt(i)
+ }
+
+ override def update(i: Int, value: Any): Unit = {
+ values.update(i, value)
+ }
+
+ override def copy(): InternalRow = {
+ HoodiePartitionValues(values.copy())
+ }
+
+ override def isNullAt(ordinal: Int): Boolean = {
+ values.isNullAt(ordinal)
+ }
+
+ override def getBoolean(ordinal: Int): Boolean = {
+ values.getBoolean(ordinal)
+ }
+
+ override def getByte(ordinal: Int): Byte = {
+ values.getByte(ordinal)
+ }
+
+ override def getShort(ordinal: Int): Short = {
+ values.getShort(ordinal)
+ }
+
+ override def getInt(ordinal: Int): Int = {
+ values.getInt(ordinal)
+ }
+
+ override def getLong(ordinal: Int): Long = {
+ values.getLong(ordinal)
+ }
+
+ override def getFloat(ordinal: Int): Float = {
+ values.getFloat(ordinal)
+ }
+
+ override def getDouble(ordinal: Int): Double = {
+ values.getDouble(ordinal)
+ }
+
+ override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal =
{
+ values.getDecimal(ordinal, precision, scale)
+ }
+
+ override def getUTF8String(ordinal: Int): UTF8String = {
+ values.getUTF8String(ordinal)
+ }
+
+ override def getBinary(ordinal: Int): Array[Byte] = {
+ values.getBinary(ordinal)
+ }
+
+ override def getInterval(ordinal: Int): CalendarInterval = {
+ values.getInterval(ordinal)
+ }
+
+ override def getStruct(ordinal: Int, numFields: Int): InternalRow = {
+ values.getStruct(ordinal, numFields)
+ }
+
+ override def getArray(ordinal: Int): ArrayData = {
+ values.getArray(ordinal)
+ }
+
+ override def getMap(ordinal: Int): MapData = {
+ values.getMap(ordinal)
+ }
+
+ override def get(ordinal: Int, dataType: DataType): AnyRef = {
+ values.get(ordinal, dataType)
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
index e66b248e0ab..fe1645d3b60 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
@@ -178,8 +178,8 @@ class HoodieSparkFileFormatUtils(val sqlContext: SQLContext,
def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined
def getHadoopFsRelation(isMOR: Boolean, isBootstrap: Boolean): BaseRelation
= {
-
- val fileIndex = HoodieFileIndex(sparkSession, metaClient,
Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sparkSession),
isMOR)
+ val fileIndex = HoodieFileIndex(
+ sparkSession, metaClient, Some(tableStructSchema), optParams,
FileStatusCache.getOrCreate(sparkSession), isMOR, shouldEmbedFileSlices = true)
val recordMergerImpls =
ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList
val recordMergerStrategy =
getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY,
Option(metaClient.getTableConfig.getRecordMergerStrategy))
@@ -205,20 +205,21 @@ class HoodieSparkFileFormatUtils(val sqlContext:
SQLContext,
} else {
Seq.empty
}
- fileIndex.shouldEmbedFileSlices = true
val fileFormat = if (fileGroupReaderEnabled) {
new HoodieFileGroupReaderBasedParquetFileFormat(
tableState, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
isMOR, isBootstrap, shouldUseRecordPosition)
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+ isMOR, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
} else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled &&
!isBootstrap) {
new
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
isMOR)
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
isMOR, false, Seq.empty)
} else {
new
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
isMOR, isBootstrap)
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+ isMOR, isBootstrap, false, Seq.empty)
}
HadoopFsRelation(
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 4dda08c2e28..44d937f22ad 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -23,7 +23,7 @@ import
org.apache.hudi.HoodieSparkConfUtils.getHollowCommitHandling
import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME
-import org.apache.hudi.common.table.timeline.TimelineUtils.{concatTimeline,
HollowCommitHandling, getCommitMetadata, handleHollowCommitIfNeeded}
+import
org.apache.hudi.common.table.timeline.TimelineUtils.{HollowCommitHandling,
concatTimeline, getCommitMetadata, handleHollowCommitIfNeeded}
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.util.StringUtils
@@ -33,6 +33,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.PartitionDirectory
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
@@ -119,6 +120,37 @@ case class MergeOnReadIncrementalRelation(override val
sqlContext: SQLContext,
}
}
+ def listFileSplits(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Map[InternalRow, Seq[FileSlice]] = {
+ val slices = if (includedCommits.isEmpty) {
+ List()
+ } else {
+ val fileSlices = if (fullTableScan) {
+ listLatestFileSlices(Seq(), partitionFilters, dataFilters)
+ } else {
+ val latestCommit = includedCommits.last.getTimestamp
+ val fsView = new HoodieTableFileSystemView(metaClient, timeline,
affectedFilesInCommits)
+ val modifiedPartitions = getWritePartitionPaths(commitsMetadata)
+
+ modifiedPartitions.asScala.flatMap { relativePartitionPath =>
+ fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath,
latestCommit).iterator().asScala
+ }.toSeq
+ }
+ filterFileSlices(fileSlices, globPattern)
+ }
+
+ slices.groupBy(fs => {
+ if (fs.getBaseFile.isPresent) {
+ getPartitionColumnValuesAsInternalRow(fs.getBaseFile.get.getFileStatus)
+ } else {
+
getPartitionColumnValuesAsInternalRow(fs.getLogFiles.findAny.get.getFileStatus)
+ }
+ })
+ }
+
+ def getRequiredFilters: Seq[Filter] = {
+ incrementalSpanRecordFilters
+ }
+
private def filterFileSlices(fileSlices: Seq[FileSlice], pathGlobPattern:
String): Seq[FileSlice] = {
val filteredFileSlices = if (!StringUtils.isNullOrEmpty(pathGlobPattern)) {
val globMatcher = new GlobPattern("*" + pathGlobPattern)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
deleted file mode 100644
index 1e639f0daab..00000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.common.model.FileSlice
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
-import org.apache.spark.sql.types.{DataType, Decimal}
-import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
-
-class PartitionFileSliceMapping(internalRow: InternalRow,
- slices: Map[String, FileSlice]) extends
InternalRow {
-
- def getSlice(fileId: String): Option[FileSlice] = {
- slices.get(fileId)
- }
-
- def getInternalRow: InternalRow = internalRow
-
- override def numFields: Int = internalRow.numFields
-
- override def setNullAt(i: Int): Unit = internalRow.setNullAt(i)
-
- override def update(i: Int, value: Any): Unit = internalRow.update(i, value)
-
- override def copy(): InternalRow = new
PartitionFileSliceMapping(internalRow.copy(), slices)
-
- override def isNullAt(ordinal: Int): Boolean = internalRow.isNullAt(ordinal)
-
- override def getBoolean(ordinal: Int): Boolean =
internalRow.getBoolean(ordinal)
-
- override def getByte(ordinal: Int): Byte = internalRow.getByte(ordinal)
-
- override def getShort(ordinal: Int): Short = internalRow.getShort(ordinal)
-
- override def getInt(ordinal: Int): Int = internalRow.getInt(ordinal)
-
- override def getLong(ordinal: Int): Long = internalRow.getLong(ordinal)
-
- override def getFloat(ordinal: Int): Float = internalRow.getFloat(ordinal)
-
- override def getDouble(ordinal: Int): Double = internalRow.getDouble(ordinal)
-
- override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal =
internalRow.getDecimal(ordinal, precision, scale)
-
- override def getUTF8String(ordinal: Int): UTF8String =
internalRow.getUTF8String(ordinal)
-
- override def getBinary(ordinal: Int): Array[Byte] =
internalRow.getBinary(ordinal)
-
- override def getInterval(ordinal: Int): CalendarInterval =
internalRow.getInterval(ordinal)
-
- override def getStruct(ordinal: Int, numFields: Int): InternalRow =
internalRow.getStruct(ordinal, numFields)
-
- override def getArray(ordinal: Int): ArrayData =
internalRow.getArray(ordinal)
-
- override def getMap(ordinal: Int): MapData = internalRow.getMap(ordinal)
-
- override def get(ordinal: Int, dataType: DataType): AnyRef =
internalRow.get(ordinal, dataType)
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
new file mode 100644
index 00000000000..a08014a4008
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.cdc
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieBaseRelation.BaseFileReader
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.{AvroConversionUtils, AvroProjection, HoodieFileIndex,
HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState,
LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport}
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger,
HoodieLogFile, HoodieRecord, HoodieRecordMerger, HoodieRecordPayload}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils}
+import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase._
+import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator
+import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.config.HoodiePayloadConfig
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.avro.HoodieAvroDeserializer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Projection
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+import java.io.Closeable
+import java.util.Properties
+import java.util.stream.Collectors
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.asScalaBufferConverter
+
+class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
+ metaClient: HoodieTableMetaClient,
+ conf: Configuration,
+ parquetReader: PartitionedFile =>
Iterator[InternalRow],
+ originTableSchema: HoodieTableSchema,
+ cdcSchema: StructType,
+ requiredCdcSchema: StructType,
+ props: TypedProperties)
+ extends Iterator[InternalRow]
+ with SparkAdapterSupport with AvroDeserializerSupport with Closeable {
+
+ protected val payloadProps: Properties =
Option(metaClient.getTableConfig.getPreCombineField)
+ .map { preCombineField =>
+ HoodiePayloadConfig.newBuilder
+ .withPayloadOrderingField(preCombineField)
+ .build
+ .getProps
+ }.getOrElse(new Properties())
+
+ private lazy val fs = metaClient.getFs.getFileSystem
+
+ private lazy val basePath = metaClient.getBasePathV2
+
+ private lazy val tableConfig = metaClient.getTableConfig
+
+ private lazy val populateMetaFields = tableConfig.populateMetaFields()
+
+ private lazy val keyGenerator = {
+ HoodieSparkKeyGeneratorFactory.createKeyGenerator(tableConfig.getProps())
+ }
+
+ private lazy val recordKeyField: String = if (populateMetaFields) {
+ HoodieRecord.RECORD_KEY_METADATA_FIELD
+ } else {
+ val keyFields = metaClient.getTableConfig.getRecordKeyFields.get()
+ checkState(keyFields.length == 1)
+ keyFields.head
+ }
+
+ private lazy val preCombineFieldOpt: Option[String] =
Option(metaClient.getTableConfig.getPreCombineField)
+
+ private lazy val tableState = {
+ val metadataConfig = HoodieMetadataConfig.newBuilder()
+ .fromProperties(props)
+ .build()
+ HoodieTableState(
+ pathToString(basePath),
+ Some(split.changes.last.getInstant),
+ recordKeyField,
+ preCombineFieldOpt,
+ usesVirtualKeys = !populateMetaFields,
+ metaClient.getTableConfig.getPayloadClass,
+ metadataConfig,
+ // TODO support CDC with spark record
+ recordMergerImpls = List(classOf[HoodieAvroRecordMerger].getName),
+ recordMergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
+ )
+ }
+
+ protected override val avroSchema: Schema = new
Schema.Parser().parse(originTableSchema.avroSchemaStr)
+
+ protected override val structTypeSchema: StructType =
originTableSchema.structTypeSchema
+
+ private val cdcSupplementalLoggingMode =
metaClient.getTableConfig.cdcSupplementalLoggingMode
+
+ private lazy val serializer =
sparkAdapter.createAvroSerializer(originTableSchema.structTypeSchema,
+ avroSchema, nullable = false)
+
+ private lazy val avroProjection = AvroProjection.create(avroSchema)
+
+ private lazy val cdcAvroSchema: Schema =
HoodieCDCUtils.schemaBySupplementalLoggingMode(
+ cdcSupplementalLoggingMode,
+ HoodieAvroUtils.removeMetadataFields(avroSchema)
+ )
+
+ private lazy val cdcSparkSchema: StructType =
AvroConversionUtils.convertAvroSchemaToStructType(cdcAvroSchema)
+
+ private lazy val sparkPartitionedFileUtils =
sparkAdapter.getSparkPartitionedFileUtils
+
+ /**
+ * The deserializer used to convert the CDC GenericRecord to Spark
InternalRow.
+ */
+ private lazy val cdcRecordDeserializer: HoodieAvroDeserializer = {
+ sparkAdapter.createAvroDeserializer(cdcAvroSchema, cdcSparkSchema)
+ }
+
+ private lazy val projection: Projection =
generateUnsafeProjection(cdcSchema, requiredCdcSchema)
+
+ // Iterator on cdc file
+ private val cdcFileIter = split.changes.iterator
+
+ // The instant that is currently being processed
+ private var currentInstant: String = _
+
+ // The change file that is currently being processed
+ private var currentCDCFileSplit: HoodieCDCFileSplit = _
+
+ /**
+ * Two cases will use this to iterator the records:
+ * 1) extract the change data from the base file directly, including
'BASE_FILE_INSERT' and 'BASE_FILE_DELETE'.
+ * 2) when the type of cdc file is 'REPLACE_COMMIT',
+ * use this to trace the records that are converted from the
'[[beforeImageRecords]]
+ */
+ private var recordIter: Iterator[InternalRow] = Iterator.empty
+
+ /**
+ * Only one case where it will be used is that extract the change data from
log files for mor table.
+ * At the time, 'logRecordIter' will work with [[beforeImageRecords]] that
keep all the records of the previous file slice.
+ */
+ private var logRecordIter: Iterator[(String, HoodieRecord[_])] =
Iterator.empty
+
+ /**
+ * Only one case where it will be used is that extract the change data from
cdc log files.
+ */
+ private var cdcLogRecordIterator: HoodieCDCLogRecordIterator = _
+
+ /**
+ * The next record need to be returned when call next().
+ */
+ protected var recordToLoad: InternalRow = _
+
+ /**
+ * The list of files to which 'beforeImageRecords' belong.
+ * Use it to determine if 'beforeImageRecords' contains all the required
data that extract
+ * the change data from the current cdc file.
+ */
+ private val beforeImageFiles: mutable.ArrayBuffer[String] =
mutable.ArrayBuffer.empty
+
+ /**
+ * Keep the before-image data. There cases will use this:
+ * 1) the cdc infer case is [[LOG_FILE]];
+ * 2) the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is
'op_key'.
+ */
+ private var beforeImageRecords: mutable.Map[String, GenericRecord] =
mutable.Map.empty
+
+ /**
+ * Keep the after-image data. Only one case will use this:
+ * the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is
[[OP_KEY_ONLY]] or [[DATA_BEFORE]].
+ */
+ private var afterImageRecords: mutable.Map[String, InternalRow] =
mutable.Map.empty
+
+ private var internalRowToJsonStringConverter = new
InternalRowToJsonStringConverter(originTableSchema)
+
+ private def needLoadNextFile: Boolean = {
+ !recordIter.hasNext &&
+ !logRecordIter.hasNext &&
+ (cdcLogRecordIterator == null || !cdcLogRecordIterator.hasNext)
+ }
+
+ @tailrec final def hasNextInternal: Boolean = {
+ if (needLoadNextFile) {
+ loadCdcFile()
+ }
+ if (currentCDCFileSplit == null) {
+ false
+ } else {
+ currentCDCFileSplit.getCdcInferCase match {
+ case BASE_FILE_INSERT | BASE_FILE_DELETE | REPLACE_COMMIT =>
+ if (recordIter.hasNext && loadNext()) {
+ true
+ } else {
+ hasNextInternal
+ }
+ case LOG_FILE =>
+ if (logRecordIter.hasNext && loadNext()) {
+ true
+ } else {
+ hasNextInternal
+ }
+ case AS_IS =>
+ if (cdcLogRecordIterator.hasNext && loadNext()) {
+ true
+ } else {
+ hasNextInternal
+ }
+ }
+ }
+ }
+
+ override def hasNext: Boolean = hasNextInternal
+
+ override final def next(): InternalRow = {
+ projection(recordToLoad)
+ }
+
+ def loadNext(): Boolean = {
+ var loaded = false
+ currentCDCFileSplit.getCdcInferCase match {
+ case BASE_FILE_INSERT =>
+ val originRecord = recordIter.next()
+ recordToLoad.update(3, convertRowToJsonString(originRecord))
+ loaded = true
+ case BASE_FILE_DELETE =>
+ val originRecord = recordIter.next()
+ recordToLoad.update(2, convertRowToJsonString(originRecord))
+ loaded = true
+ case LOG_FILE =>
+ loaded = loadNextLogRecord()
+ case AS_IS =>
+ val record = cdcLogRecordIterator.next().asInstanceOf[GenericRecord]
+ cdcSupplementalLoggingMode match {
+ case `DATA_BEFORE_AFTER` =>
+ recordToLoad.update(0,
convertToUTF8String(String.valueOf(record.get(0))))
+ val before = record.get(2).asInstanceOf[GenericRecord]
+ recordToLoad.update(2, recordToJsonAsUTF8String(before))
+ val after = record.get(3).asInstanceOf[GenericRecord]
+ recordToLoad.update(3, recordToJsonAsUTF8String(after))
+ case `DATA_BEFORE` =>
+ val row =
cdcRecordDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
+ val op = row.getString(0)
+ val recordKey = row.getString(1)
+ recordToLoad.update(0, convertToUTF8String(op))
+ val before = record.get(2).asInstanceOf[GenericRecord]
+ recordToLoad.update(2, recordToJsonAsUTF8String(before))
+ parse(op) match {
+ case INSERT =>
+ recordToLoad.update(3,
convertRowToJsonString(afterImageRecords(recordKey)))
+ case UPDATE =>
+ recordToLoad.update(3,
convertRowToJsonString(afterImageRecords(recordKey)))
+ case _ =>
+ recordToLoad.update(3, null)
+ }
+ case _ =>
+ val row =
cdcRecordDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
+ val op = row.getString(0)
+ val recordKey = row.getString(1)
+ recordToLoad.update(0, convertToUTF8String(op))
+ parse(op) match {
+ case INSERT =>
+ recordToLoad.update(2, null)
+ recordToLoad.update(3,
convertRowToJsonString(afterImageRecords(recordKey)))
+ case UPDATE =>
+ recordToLoad.update(2,
recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
+ recordToLoad.update(3,
convertRowToJsonString(afterImageRecords(recordKey)))
+ case _ =>
+ recordToLoad.update(2,
recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
+ recordToLoad.update(3, null)
+ }
+ }
+ loaded = true
+ case REPLACE_COMMIT =>
+ val originRecord = recordIter.next()
+ recordToLoad.update(2, convertRowToJsonString(originRecord))
+ loaded = true
+ }
+ loaded
+ }
+
+ /**
+ * Load the next log record, and judge how to convert it to cdc format.
+ */
+ private def loadNextLogRecord(): Boolean = {
+ var loaded = false
+ val (key, logRecord) = logRecordIter.next()
+ val indexedRecord = getInsertValue(logRecord)
+ if (indexedRecord.isEmpty) {
+ // it's a deleted record.
+ val existingRecordOpt = beforeImageRecords.remove(key)
+ if (existingRecordOpt.isEmpty) {
+ // no real record is deleted, just ignore.
+ } else {
+ // there is a real record deleted.
+ recordToLoad.update(0, CDCRelation.CDC_OPERATION_DELETE)
+ recordToLoad.update(2, recordToJsonAsUTF8String(existingRecordOpt.get))
+ recordToLoad.update(3, null)
+ loaded = true
+ }
+ } else {
+ val existingRecordOpt = beforeImageRecords.get(key)
+ if (existingRecordOpt.isEmpty) {
+ // a new record is inserted.
+ val insertedRecord =
avroProjection(indexedRecord.get.asInstanceOf[GenericRecord])
+ recordToLoad.update(0, CDCRelation.CDC_OPERATION_INSERT)
+ recordToLoad.update(2, null)
+ recordToLoad.update(3, recordToJsonAsUTF8String(insertedRecord))
+ // insert into beforeImageRecords
+ beforeImageRecords(key) = insertedRecord
+ loaded = true
+ } else {
+ // a existed record is updated.
+ val existingRecord = existingRecordOpt.get
+ val merged = merge(existingRecord, logRecord)
+ val mergeRecord = avroProjection(merged.asInstanceOf[GenericRecord])
+ if (existingRecord != mergeRecord) {
+ recordToLoad.update(0, CDCRelation.CDC_OPERATION_UPDATE)
+ recordToLoad.update(2, recordToJsonAsUTF8String(existingRecord))
+ recordToLoad.update(3, recordToJsonAsUTF8String(mergeRecord))
+ // update into beforeImageRecords
+ beforeImageRecords(key) = mergeRecord
+ loaded = true
+ }
+ }
+ }
+ loaded
+ }
+
+ private def loadCdcFile(): Unit = {
+ // reset all the iterator first.
+ recordIter = Iterator.empty
+ logRecordIter = Iterator.empty
+ beforeImageRecords.clear()
+ afterImageRecords.clear()
+ if (cdcLogRecordIterator != null) {
+ cdcLogRecordIterator.close()
+ cdcLogRecordIterator = null
+ }
+
+ if (cdcFileIter.hasNext) {
+ val split = cdcFileIter.next()
+ currentInstant = split.getInstant
+ currentCDCFileSplit = split
+ currentCDCFileSplit.getCdcInferCase match {
+ case BASE_FILE_INSERT =>
+ assert(currentCDCFileSplit.getCdcFiles != null &&
currentCDCFileSplit.getCdcFiles.size() == 1)
+ val absCDCPath = new Path(basePath,
currentCDCFileSplit.getCdcFiles.get(0))
+ val fileStatus = fs.getFileStatus(absCDCPath)
+
+ val pf = sparkPartitionedFileUtils.createPartitionedFile(
+ InternalRow.empty, absCDCPath, 0, fileStatus.getLen)
+ recordIter = parquetReader(pf)
+ case BASE_FILE_DELETE =>
+ assert(currentCDCFileSplit.getBeforeFileSlice.isPresent)
+ recordIter =
loadFileSlice(currentCDCFileSplit.getBeforeFileSlice.get)
+ case LOG_FILE =>
+ assert(currentCDCFileSplit.getCdcFiles != null &&
currentCDCFileSplit.getCdcFiles.size() == 1
+ && currentCDCFileSplit.getBeforeFileSlice.isPresent)
+
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
+ val absLogPath = new Path(basePath,
currentCDCFileSplit.getCdcFiles.get(0))
+ val morSplit = HoodieMergeOnReadFileSplit(None, List(new
HoodieLogFile(fs.getFileStatus(absLogPath))))
+ val logFileIterator = new LogFileIterator(morSplit,
originTableSchema, originTableSchema, tableState, conf)
+ logRecordIter = logFileIterator.logRecordsPairIterator
+ case AS_IS =>
+ assert(currentCDCFileSplit.getCdcFiles != null &&
!currentCDCFileSplit.getCdcFiles.isEmpty)
+ // load beforeFileSlice to beforeImageRecords
+ if (currentCDCFileSplit.getBeforeFileSlice.isPresent) {
+
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
+ }
+ // load afterFileSlice to afterImageRecords
+ if (currentCDCFileSplit.getAfterFileSlice.isPresent) {
+ val iter =
loadFileSlice(currentCDCFileSplit.getAfterFileSlice.get())
+ afterImageRecords = mutable.Map.empty
+ iter.foreach { row =>
+ val key = getRecordKey(row)
+ afterImageRecords.put(key, row.copy())
+ }
+ }
+
+ val cdcLogFiles = currentCDCFileSplit.getCdcFiles.asScala.map {
cdcFile =>
+ new HoodieLogFile(fs.getFileStatus(new Path(basePath, cdcFile)))
+ }.toArray
+ cdcLogRecordIterator = new HoodieCDCLogRecordIterator(fs,
cdcLogFiles, cdcAvroSchema)
+ case REPLACE_COMMIT =>
+ if (currentCDCFileSplit.getBeforeFileSlice.isPresent) {
+
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
+ }
+ recordIter = beforeImageRecords.values.map { record =>
+ deserialize(record)
+ }.iterator
+ beforeImageRecords.clear()
+ }
+ resetRecordFormat()
+ } else {
+ currentInstant = null
+ currentCDCFileSplit = null
+ }
+ }
+
+ /**
+ * Initialize the partial fields of the data to be returned in advance to
speed up.
+ */
+ private def resetRecordFormat(): Unit = {
+ recordToLoad = currentCDCFileSplit.getCdcInferCase match {
+ case BASE_FILE_INSERT =>
+ InternalRow.fromSeq(Array(
+ CDCRelation.CDC_OPERATION_INSERT,
convertToUTF8String(currentInstant),
+ null, null))
+ case BASE_FILE_DELETE =>
+ InternalRow.fromSeq(Array(
+ CDCRelation.CDC_OPERATION_DELETE,
convertToUTF8String(currentInstant),
+ null, null))
+ case LOG_FILE =>
+ InternalRow.fromSeq(Array(
+ null, convertToUTF8String(currentInstant),
+ null, null))
+ case AS_IS =>
+ InternalRow.fromSeq(Array(
+ null, convertToUTF8String(currentInstant),
+ null, null))
+ case REPLACE_COMMIT =>
+ InternalRow.fromSeq(Array(
+ CDCRelation.CDC_OPERATION_DELETE,
convertToUTF8String(currentInstant),
+ null, null))
+ }
+ }
+
+ /**
+ * If [[beforeImageFiles]] are the list of file that we want to load
exactly, use this directly.
+ * Otherwise we need to re-load what we need.
+ */
+ private def loadBeforeFileSliceIfNeeded(fileSlice: FileSlice): Unit = {
+ val files = List(fileSlice.getBaseFile.get().getPath) ++
+ fileSlice.getLogFiles.collect(Collectors.toList[HoodieLogFile]).asScala
+ .map(f => pathToString(f.getPath)).toList
+ val same = files.sorted == beforeImageFiles.sorted.toList
+ if (!same) {
+ // clear up the beforeImageRecords
+ beforeImageRecords.clear()
+ val iter = loadFileSlice(fileSlice)
+ iter.foreach { row =>
+ val key = getRecordKey(row)
+ // Due to the reuse buffer mechanism of Spark serialization,
+ // we have to copy the serialized result if we need to retain its
reference
+ beforeImageRecords.put(key, serialize(row, copy = true))
+ }
+ // reset beforeImageFiles
+ beforeImageFiles.clear()
+ beforeImageFiles.append(files: _*)
+ }
+ }
+
+ private def loadFileSlice(fileSlice: FileSlice): Iterator[InternalRow] = {
+ val baseFileStatus = fs.getFileStatus(new
Path(fileSlice.getBaseFile.get().getPath))
+ val basePartitionedFile = sparkPartitionedFileUtils.createPartitionedFile(
+ InternalRow.empty,
+ baseFileStatus.getPath,
+ 0,
+ baseFileStatus.getLen
+ )
+ val logFiles = fileSlice.getLogFiles
+ .sorted(HoodieLogFile.getLogFileComparator)
+ .collect(Collectors.toList[HoodieLogFile])
+ .asScala.toList
+ .filterNot(_.getFileName.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+
+ if (logFiles.isEmpty) {
+ // no log files, just load the base parquet file
+ parquetReader(basePartitionedFile)
+ } else {
+ // use [[RecordMergingFileIterator]] to load both the base file and log
files
+ val morSplit = HoodieMergeOnReadFileSplit(Some(basePartitionedFile),
logFiles)
+ new RecordMergingFileIterator(
+ morSplit,
+ BaseFileReader(parquetReader, originTableSchema.structTypeSchema),
+ originTableSchema,
+ originTableSchema,
+ tableState,
+ conf)
+ }
+ }
+
+ /**
+ * Convert InternalRow to json string.
+ */
+ private def convertRowToJsonString(record: InternalRow): UTF8String = {
+ internalRowToJsonStringConverter.convert(record)
+ }
+
+ /**
+ * The data of string type is stored in InternalRow using UTF8String type.
+ */
+ private def convertToUTF8String(str: String): UTF8String = {
+ UTF8String.fromString(str)
+ }
+
+ private def pathToString(p: Path): String = {
+ p.toUri.toString
+ }
+
+ private def serialize(curRowRecord: InternalRow, copy: Boolean = false):
GenericRecord = {
+ val record = serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
+ if (copy) {
+ GenericData.get().deepCopy(record.getSchema, record)
+ } else {
+ record
+ }
+ }
+
+ private def recordToJsonAsUTF8String(record: GenericRecord): UTF8String = {
+ convertToUTF8String(HoodieCDCUtils.recordToJson(record))
+ }
+
+ private def getRecordKey(row: InternalRow): String = {
+ if (populateMetaFields) {
+
row.getString(structTypeSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD))
+ } else {
+ this.keyGenerator.getKey(serialize(row)).getRecordKey
+ }
+ }
+
+ private def getInsertValue(
+ record: HoodieRecord[_])
+ : Option[IndexedRecord] = {
+ toScalaOption(record.toIndexedRecord(avroSchema,
payloadProps)).map(_.getData)
+ }
+
+ private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_]):
IndexedRecord = {
+
newRecord.getData.asInstanceOf[HoodieRecordPayload[_]].combineAndGetUpdateValue(
+ curAvroRecord, avroSchema, payloadProps).get()
+ }
+
+ override def close(): Unit = {}
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
index c250a875f2b..0e657600e0d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
@@ -26,7 +26,7 @@ import
org.apache.hudi.DataSourceReadOptions.{REALTIME_PAYLOAD_COMBINE_OPT_VAL,
import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
-import org.apache.hudi.{HoodieBaseRelation, HoodieTableSchema,
HoodieTableState, LogFileIterator, MergeOnReadSnapshotRelation,
PartitionFileSliceMapping, RecordMergingFileIterator, SparkAdapterSupport}
+import org.apache.hudi.{HoodieBaseRelation, HoodieTableSchema,
HoodieTableState, LogFileIterator, MergeOnReadSnapshotRelation,
HoodiePartitionFileSliceMapping, RecordMergingFileIterator, SparkAdapterSupport}
import org.apache.spark.broadcast.Broadcast
import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.SparkSession
@@ -49,7 +49,9 @@ class HoodieMultipleBaseFileFormat(tableState:
Broadcast[HoodieTableState],
tableName: String,
mergeType: String,
mandatoryFields: Seq[String],
- isMOR: Boolean) extends FileFormat with
SparkAdapterSupport {
+ isMOR: Boolean,
+ isIncremental: Boolean,
+ requiredFilters: Seq[Filter]) extends
FileFormat with SparkAdapterSupport {
private val parquetFormat = new ParquetFileFormat()
private val orcFormat = new OrcFileFormat()
@@ -104,7 +106,9 @@ class HoodieMultipleBaseFileFormat(tableState:
Broadcast[HoodieTableState],
options: Map[String, String],
hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
val outputSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
- val requiredSchemaWithMandatory = if (!isMOR ||
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
+ val requiredSchemaWithMandatory = if (isIncremental) {
+ StructType(dataSchema.toArray ++ partitionSchema.fields)
+ } else if (!isMOR ||
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
// add mandatory fields to required schema
val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
for (field <- mandatoryFields) {
@@ -120,14 +124,15 @@ class HoodieMultipleBaseFileFormat(tableState:
Broadcast[HoodieTableState],
}
val (parquetBaseFileReader, orcBaseFileReader,
preMergeParquetBaseFileReader, preMergeOrcBaseFileReader) = buildFileReaders(
- sparkSession, dataSchema, partitionSchema, requiredSchema, filters,
options, hadoopConf, requiredSchemaWithMandatory)
+ sparkSession, dataSchema, partitionSchema, if (isIncremental)
requiredSchemaWithMandatory else requiredSchema,
+ filters, options, hadoopConf, requiredSchemaWithMandatory)
val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
val filePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
val fileFormat = detectFileFormat(filePath.toString)
file.partitionValues match {
- case fileSliceMapping: PartitionFileSliceMapping =>
+ case fileSliceMapping: HoodiePartitionFileSliceMapping =>
if (FSUtils.isLogFile(filePath)) {
// no base file
val fileSlice =
fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
@@ -141,7 +146,7 @@ class HoodieMultipleBaseFileFormat(tableState:
Broadcast[HoodieTableState],
case Some(fileSlice) =>
val hoodieBaseFile = fileSlice.getBaseFile.get()
val baseFileFormat =
detectFileFormat(hoodieBaseFile.getFileName)
- val partitionValues = fileSliceMapping.getInternalRow
+ val partitionValues = fileSliceMapping.getPartitionValues
val logFiles = getLogFilesFromSlice(fileSlice)
if (requiredSchemaWithMandatory.isEmpty) {
val baseFile = createPartitionedFile(partitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
@@ -198,20 +203,20 @@ class HoodieMultipleBaseFileFormat(tableState:
Broadcast[HoodieTableState],
PartitionedFile => Iterator[InternalRow],
PartitionedFile => Iterator[InternalRow]) = {
val parquetBaseFileReader =
parquetFormat.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema, requiredSchema,
- filters, options, new Configuration(hadoopConf))
+ filters ++ requiredFilters, options, new Configuration(hadoopConf))
val orcBaseFileReader =
orcFormat.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema, requiredSchema,
- filters, options, new Configuration(hadoopConf))
+ filters ++ requiredFilters, options, new Configuration(hadoopConf))
val preMergeParquetBaseFileReader = if (isMOR) {
parquetFormat.buildReaderWithPartitionValues(sparkSession, dataSchema,
StructType(Seq.empty),
- requiredSchemaWithMandatory, Seq.empty, options, new
Configuration(hadoopConf))
+ requiredSchemaWithMandatory, requiredFilters, options, new
Configuration(hadoopConf))
} else {
_: PartitionedFile => Iterator.empty
}
val preMergeOrcBaseFileReader = if (isMOR) {
orcFormat.buildReaderWithPartitionValues(sparkSession, dataSchema,
StructType(Seq.empty),
- requiredSchemaWithMandatory, Seq.empty, options, new
Configuration(hadoopConf))
+ requiredSchemaWithMandatory, requiredFilters, options, new
Configuration(hadoopConf))
} else {
_: PartitionedFile => Iterator.empty
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 189bcb39e61..d8278ea8218 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -26,8 +26,13 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile,
HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.read.HoodieFileGroupReader
+import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation,
HoodieCDCFileGroupSplit}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieFileGroupId
import org.apache.hudi.common.util.{Option => HOption}
-import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils,
HoodieTableSchema, HoodieTableState, MergeOnReadSnapshotRelation,
PartitionFileSliceMapping, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex,
HoodiePartitionCDCFileGroupMapping,
+ HoodieSparkUtils, HoodieTableSchema, HoodieTableState,
MergeOnReadSnapshotRelation, HoodiePartitionFileSliceMapping,
+ SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
@@ -35,6 +40,7 @@ import
org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.SerializableConfiguration
+import org.apache.hudi.common.model.HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -51,7 +57,9 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState:
HoodieTableState,
mandatoryFields: Seq[String],
isMOR: Boolean,
isBootstrap: Boolean,
- shouldUseRecordPosition:
Boolean
+ isIncremental: Boolean,
+ shouldUseRecordPosition:
Boolean,
+ requiredFilters: Seq[Filter]
) extends ParquetFileFormat with
SparkAdapterSupport {
var isProjected = false
@@ -70,33 +78,32 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
supportBatchResult
}
- override def isSplitable(
- sparkSession: SparkSession,
- options: Map[String, String],
- path: Path): Boolean = {
- false
- }
+ override def isSplitable(sparkSession: SparkSession,
+ options: Map[String, String],
+ path: Path): Boolean = false
- override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ override def buildReaderWithPartitionValues(spark: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
- val requiredSchemaWithMandatory =
generateRequiredSchemaWithMandatory(requiredSchema, dataSchema)
+ val requiredSchemaWithMandatory =
generateRequiredSchemaWithMandatory(requiredSchema, dataSchema, partitionSchema)
val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
val requiredMeta = StructType(requiredSchemaSplits._1)
val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
val augmentedHadoopConf = FSUtils.buildInlineConf(hadoopConf)
val (baseFileReader, preMergeBaseFileReader, _, _) = buildFileReaders(
- sparkSession, dataSchema, partitionSchema, requiredSchema, filters,
options, augmentedHadoopConf,
- requiredSchemaWithMandatory, requiredWithoutMeta, requiredMeta)
- val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new
SerializableConfiguration(augmentedHadoopConf))
+ spark, dataSchema, partitionSchema, if (isIncremental)
requiredSchemaWithMandatory else requiredSchema,
+ filters, options, augmentedHadoopConf, requiredSchemaWithMandatory,
requiredWithoutMeta, requiredMeta)
+ val broadcastedHadoopConf = spark.sparkContext.broadcast(new
SerializableConfiguration(augmentedHadoopConf))
+ val props: TypedProperties = HoodieFileIndex.getConfigProperties(spark,
options)
(file: PartitionedFile) => {
file.partitionValues match {
- case fileSliceMapping: PartitionFileSliceMapping =>
+ // Snapshot or incremental queries.
+ case fileSliceMapping: HoodiePartitionFileSliceMapping =>
val filePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
if (FSUtils.isLogFile(filePath)) {
// TODO: Use FileGroupReader here: HUDI-6942.
@@ -106,7 +113,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
case Some(fileSlice) =>
val hoodieBaseFile = fileSlice.getBaseFile.get()
val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
- val partitionValues = fileSliceMapping.getInternalRow
+ val partitionValues = fileSliceMapping.getPartitionValues
val logFiles = getLogFilesFromSlice(fileSlice)
if (requiredSchemaWithMandatory.isEmpty) {
val baseFile = createPartitionedFile(partitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
@@ -137,12 +144,37 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
case _ => baseFileReader(file)
}
}
+ // CDC queries.
+ case hoodiePartitionCDCFileGroupSliceMapping:
HoodiePartitionCDCFileGroupMapping =>
+ val filePath: Path =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+ val fileGroupId: HoodieFileGroupId = new
HoodieFileGroupId(filePath.getParent.toString, filePath.getName)
+ val fileSplits =
hoodiePartitionCDCFileGroupSliceMapping.getFileSplitsFor(fileGroupId).get.toArray
+ val fileGroupSplit: HoodieCDCFileGroupSplit =
HoodieCDCFileGroupSplit(fileSplits)
+ buildCDCRecordIterator(fileGroupSplit, preMergeBaseFileReader,
broadcastedHadoopConf.value.value, requiredSchema, props)
// TODO: Use FileGroupReader here: HUDI-6942.
case _ => baseFileReader(file)
}
}
}
+ protected def buildCDCRecordIterator(cdcFileGroupSplit:
HoodieCDCFileGroupSplit,
+ preMergeBaseFileReader: PartitionedFile
=> Iterator[InternalRow],
+ hadoopConf: Configuration,
+ requiredSchema: StructType,
+ props: TypedProperties):
Iterator[InternalRow] = {
+ val metaClient =
HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf,
tableState.tablePath, props)
+ val cdcSchema = CDCRelation.FULL_CDC_SPARK_SCHEMA
+ new CDCFileGroupIterator(
+ cdcFileGroupSplit,
+ metaClient,
+ hadoopConf,
+ preMergeBaseFileReader,
+ tableSchema,
+ cdcSchema,
+ requiredSchema,
+ props)
+ }
+
protected def buildFileGroupIterator(preMergeBaseFileReader: PartitionedFile
=> Iterator[InternalRow],
partitionValues: InternalRow,
baseFile: HoodieBaseFile,
@@ -172,8 +204,12 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala
}
- def generateRequiredSchemaWithMandatory(requiredSchema: StructType,
dataSchema: StructType): StructType = {
- if (!isMOR ||
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState)) {
+ def generateRequiredSchemaWithMandatory(requiredSchema: StructType,
+ dataSchema: StructType,
+ partitionSchema: StructType):
StructType = {
+ if (isIncremental) {
+ StructType(dataSchema.toArray ++ partitionSchema.fields)
+ } else if (!isMOR ||
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState)) {
val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
for (field <- mandatoryFields) {
if (requiredSchema.getFieldIndex(field).isEmpty) {
@@ -197,15 +233,14 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
PartitionedFile => Iterator[InternalRow],
PartitionedFile => Iterator[InternalRow]) = {
- //file reader when you just read a hudi parquet file and don't do any
merging
val baseFileReader = super.buildReaderWithPartitionValues(sparkSession,
dataSchema, partitionSchema, requiredSchema,
- filters, options, new Configuration(hadoopConf))
+ filters ++ requiredFilters, options, new Configuration(hadoopConf))
//file reader for reading a hudi base file that needs to be merged with
log files
val preMergeBaseFileReader = if (isMOR) {
// Add support for reading files using inline file system.
super.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema,
- requiredSchemaWithMandatory, Seq.empty, options, new
Configuration(hadoopConf))
+ requiredSchemaWithMandatory, requiredFilters, options, new
Configuration(hadoopConf))
} else {
_: PartitionedFile => Iterator.empty
}
@@ -215,7 +250,6 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
// 2. if we need to merge the bootstrap base and skeleton files then we
cannot filter
// 3. if we need to merge the bootstrap base and skeleton files then we
should never append partitions to the
// skeleton reader
-
val needMetaCols = requiredMeta.nonEmpty
val needDataCols = requiredWithoutMeta.nonEmpty
@@ -228,7 +262,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
} else {
// filter and append
super.buildReaderWithPartitionValues(sparkSession,
HoodieSparkUtils.getMetaSchema, partitionSchema,
- requiredMeta, filters, options, new Configuration(hadoopConf))
+ requiredMeta, filters ++ requiredFilters, options, new
Configuration(hadoopConf))
}
} else {
_: PartitionedFile => Iterator.empty
@@ -248,7 +282,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
} else {
// filter and append
super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
- filters, options, new Configuration(hadoopConf))
+ filters ++ requiredFilters, options, new Configuration(hadoopConf))
}
} else {
_: PartitionedFile => Iterator.empty
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
index a8ba96b9b71..af3cdf715e8 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
@@ -26,7 +26,7 @@ import
org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{BaseFile, FileSlice, HoodieLogFile,
HoodieRecord}
import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils,
HoodieTableSchema, HoodieTableState, LogFileIterator,
MergeOnReadSnapshotRelation, PartitionFileSliceMapping,
RecordMergingFileIterator, SkipMergeIterator, SparkAdapterSupport}
+import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils,
HoodieTableSchema, HoodieTableState, LogFileIterator,
MergeOnReadSnapshotRelation, HoodiePartitionFileSliceMapping,
RecordMergingFileIterator, SkipMergeIterator, SparkAdapterSupport}
import org.apache.spark.broadcast.Broadcast
import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.SparkSession
@@ -51,7 +51,10 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
mergeType: String,
mandatoryFields: Seq[String],
isMOR: Boolean,
- isBootstrap: Boolean) extends
ParquetFileFormat with SparkAdapterSupport {
+ isBootstrap: Boolean,
+ isIncremental: Boolean,
+ requiredFilters: Seq[Filter]
+ ) extends ParquetFileFormat with
SparkAdapterSupport {
override def isSplitable(sparkSession: SparkSession,
options: Map[String, String],
@@ -86,7 +89,9 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
val outputSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
- val requiredSchemaWithMandatory = if (!isMOR ||
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
+ val requiredSchemaWithMandatory = if (isIncremental) {
+ StructType(dataSchema.toArray ++ partitionSchema.fields)
+ } else if (!isMOR ||
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
//add mandatory fields to required schema
val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
for (field <- mandatoryFields) {
@@ -114,13 +119,14 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
val bootstrapBaseAppend = needDataCols && isBootstrap && !isMOR &&
partitionSchema.nonEmpty
val (baseFileReader, preMergeBaseFileReader, skeletonReader,
bootstrapBaseReader) = buildFileReaders(sparkSession,
- dataSchema, partitionSchema, requiredSchema, filters, options,
hadoopConf, requiredSchemaWithMandatory,
+ dataSchema, partitionSchema, if (isIncremental)
requiredSchemaWithMandatory else requiredSchema,
+ filters, options, hadoopConf, requiredSchemaWithMandatory,
requiredWithoutMeta, requiredMeta)
val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
file.partitionValues match {
- case fileSliceMapping: PartitionFileSliceMapping =>
+ case fileSliceMapping: HoodiePartitionFileSliceMapping =>
val filePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
if (FSUtils.isLogFile(filePath)) {
//no base file
@@ -135,7 +141,7 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
case Some(fileSlice) =>
val hoodieBaseFile = fileSlice.getBaseFile.get()
val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
- val partitionValues = fileSliceMapping.getInternalRow
+ val partitionValues = fileSliceMapping.getPartitionValues
val logFiles = getLogFilesFromSlice(fileSlice)
if (requiredSchemaWithMandatory.isEmpty) {
val baseFile = createPartitionedFile(partitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
@@ -184,14 +190,13 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
PartitionedFile => Iterator[InternalRow]) = {
//file reader when you just read a hudi parquet file and don't do any
merging
-
val baseFileReader = super.buildReaderWithPartitionValues(sparkSession,
dataSchema, partitionSchema, requiredSchema,
- filters, options, new Configuration(hadoopConf))
+ filters ++ requiredFilters, options, new Configuration(hadoopConf))
//file reader for reading a hudi base file that needs to be merged with
log files
val preMergeBaseFileReader = if (isMOR) {
super.buildReaderWithPartitionValues(sparkSession, dataSchema,
StructType(Seq.empty),
- requiredSchemaWithMandatory, Seq.empty, options, new
Configuration(hadoopConf))
+ requiredSchemaWithMandatory, requiredFilters, options, new
Configuration(hadoopConf))
} else {
_: PartitionedFile => Iterator.empty
}
@@ -234,7 +239,7 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
} else {
// filter and append
super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
- filters, options, new Configuration(hadoopConf))
+ filters ++ requiredFilters, options, new Configuration(hadoopConf))
}
} else {
_: PartitionedFile => Iterator.empty