This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7aa92cb32038 [HUDI-9695] Use BufferedRecordMerger for spark cdc reader
(#13694)
7aa92cb32038 is described below
commit 7aa92cb32038c4eeda80868431468d510e837f42
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Aug 13 03:11:00 2025 -0400
[HUDI-9695] Use BufferedRecordMerger for spark cdc reader (#13694)
Co-authored-by: Jonathan Vexler <=>
---
.../common/table/read/HoodieFileGroupReader.java | 16 ++-
.../read/buffer/KeyBasedFileGroupRecordBuffer.java | 4 +
.../org/apache/hudi/cdc/CDCFileGroupIterator.scala | 149 ++++++++++++---------
.../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala | 2 +-
.../cdc/InternalRowToJsonStringConverter.scala | 8 +-
.../cdc/TestInternalRowToJsonStringConverter.scala | 4 +-
.../sql/hudi/feature/TestCDCForSparkSQL.scala | 86 +++++++++++-
7 files changed, 188 insertions(+), 81 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index d4d7b64b1ee2..5615374c1313 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -280,22 +280,25 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
}
}
- private ClosableIterator<BufferedRecord<T>> getBufferedRecordIterator()
throws IOException {
+ private ClosableIterator<BufferedRecord<T>>
getBufferedRecordIterator(IteratorMode iteratorMode) throws IOException {
+ this.readerContext.setIteratorMode(iteratorMode);
initRecordIterators();
return new HoodieFileGroupReaderIterator<>(this);
}
+ public ClosableIterator<BufferedRecord<T>>
getClosableBufferedRecordIterator() throws IOException {
+ return getBufferedRecordIterator(IteratorMode.HOODIE_RECORD);
+ }
+
public ClosableIterator<T> getClosableIterator() throws IOException {
- this.readerContext.setIteratorMode(IteratorMode.ENGINE_RECORD);
- return new CloseableMappingIterator<>(getBufferedRecordIterator(),
BufferedRecord::getRecord);
+ return new
CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.ENGINE_RECORD),
BufferedRecord::getRecord);
}
/**
* @return An iterator over the records that wraps the engine-specific
record in a HoodieRecord.
*/
public ClosableIterator<HoodieRecord<T>> getClosableHoodieRecordIterator()
throws IOException {
- this.readerContext.setIteratorMode(IteratorMode.HOODIE_RECORD);
- return new CloseableMappingIterator<>(getBufferedRecordIterator(),
+ return new
CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.HOODIE_RECORD),
bufferedRecord ->
readerContext.getRecordContext().constructHoodieRecord(bufferedRecord));
}
@@ -303,8 +306,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
* @return A record key iterator over the records.
*/
public ClosableIterator<String> getClosableKeyIterator() throws IOException {
- this.readerContext.setIteratorMode(IteratorMode.RECORD_KEY);
- return new CloseableMappingIterator<>(getBufferedRecordIterator(),
BufferedRecord::getRecordKey);
+ return new
CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.RECORD_KEY),
BufferedRecord::getRecordKey);
}
public ClosableIterator<BufferedRecord<T>> getLogRecordsOnly() throws
IOException {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
index 75fc3b7fd024..8fe84cbc2042 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
@@ -156,4 +156,8 @@ public class KeyBasedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
// Handle records solely from log files.
return hasNextLogRecord();
}
+
+ public boolean isPartialMergingEnabled() {
+ return enablePartialMerging;
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index 99e3fb23b594..18cfbf05fe9d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -26,20 +26,20 @@ import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig,
HoodieMetadataConfig, HoodieReaderConfig, TypedProperties}
import
org.apache.hudi.common.config.HoodieCommonConfig.{DISK_MAP_BITCASK_COMPRESSION_ENABLED,
SPILLABLE_DISK_MAP_TYPE}
import org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH
-import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{FileSlice, HoodieLogFile,
HoodieRecordMerger, HoodieSparkRecord}
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile,
HoodieRecordMerger}
+import
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID
import org.apache.hudi.common.serialization.DefaultSerializer
-import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.{HoodieTableMetaClient, PartialUpdateMode}
import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils}
import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase._
import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
import org.apache.hudi.common.table.log.{HoodieCDCLogRecordIterator,
HoodieMergedLogRecordReader}
-import org.apache.hudi.common.table.read.{BufferedRecord,
FileGroupReaderSchemaHandler, HoodieFileGroupReader, HoodieReadStats,
UpdateProcessor}
+import org.apache.hudi.common.table.read.{BufferedRecord,
BufferedRecordMerger, BufferedRecordMergerFactory, BufferedRecords,
FileGroupReaderSchemaHandler, HoodieFileGroupReader, HoodieReadStats,
IteratorMode, UpdateProcessor}
import org.apache.hudi.common.table.read.buffer.KeyBasedFileGroupRecordBuffer
-import org.apache.hudi.common.util.{DefaultSizeEstimator, FileIOUtils, Option}
-import org.apache.hudi.common.util.collection.{ExternalSpillableMap,
ImmutablePair}
+import org.apache.hudi.common.util.{DefaultSizeEstimator, FileIOUtils,
HoodieRecordUtils, Option}
+import org.apache.hudi.common.util.collection.ExternalSpillableMap
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.data.CloseableIteratorListener
import org.apache.hudi.storage.{StorageConfiguration, StoragePath}
@@ -48,6 +48,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.HoodieAvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Projection
@@ -75,6 +76,26 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
extends Iterator[InternalRow]
with SparkAdapterSupport with AvroDeserializerSupport with Closeable {
+ private lazy val readerContext = {
+ val bufferedReaderContext = new
SparkFileFormatInternalRowReaderContext(baseFileReader,
+ Seq.empty, Seq.empty, conf, metaClient.getTableConfig)
+ bufferedReaderContext.initRecordMerger(readerProperties)
+ bufferedReaderContext
+ }
+
+ private lazy val orderingFieldNames =
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode, props,
metaClient)
+ private lazy val payloadClass: Option[String] = if
(recordMerger.getMergingStrategy == PAYLOAD_BASED_MERGE_STRATEGY_UUID) {
+ Option.of(metaClient.getTableConfig.getPayloadClass)
+ } else {
+ Option.empty.asInstanceOf[Option[String]]
+ }
+ private lazy val partialUpdateMode: PartialUpdateMode =
metaClient.getTableConfig.getPartialUpdateMode
+ private var isPartialMergeEnabled = false
+ private var bufferedRecordMerger = getBufferedRecordMerger
+ private def getBufferedRecordMerger: BufferedRecordMerger[InternalRow] =
BufferedRecordMergerFactory.create(readerContext,
+ readerContext.getMergeMode, isPartialMergeEnabled,
Option.of(recordMerger), orderingFieldNames,
+ payloadClass, avroSchema, props, partialUpdateMode)
+
private lazy val storage = metaClient.getStorage
private lazy val basePath = metaClient.getBasePath
@@ -100,11 +121,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
readerProps
}
- private lazy val recordMerger: HoodieRecordMerger = {
- val readerContext: HoodieReaderContext[InternalRow] = new
SparkFileFormatInternalRowReaderContext(baseFileReader, Seq.empty, Seq.empty,
conf, metaClient.getTableConfig)
- readerContext.initRecordMerger(props)
- readerContext.getRecordMerger.get()
- }
+ private lazy val recordMerger: HoodieRecordMerger =
readerContext.getRecordMerger().get()
protected override val avroSchema: Schema = new
Schema.Parser().parse(originTableSchema.avroSchemaStr)
@@ -145,13 +162,14 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
* 2) when the type of cdc file is 'REPLACE_COMMIT',
* use this to trace the records that are converted from the
'[[beforeImageRecords]]
*/
- private var recordIter: Iterator[InternalRow] = Iterator.empty
+ private var recordIter: Iterator[BufferedRecord[InternalRow]] =
Iterator.empty
/**
* Only one case where it will be used is that extract the change data from
log files for mor table.
* At the time, 'logRecordIter' will work with [[beforeImageRecords]] that
keep all the records of the previous file slice.
*/
private var logRecordIter: Iterator[BufferedRecord[InternalRow]] =
Iterator.empty
+ private var keyBasedFileGroupRecordBuffer:
Option[KeyBasedFileGroupRecordBuffer[InternalRow]] =
Option.empty.asInstanceOf[Option[KeyBasedFileGroupRecordBuffer[InternalRow]]]
/**
* Only one case where it will be used is that extract the change data from
cdc log files.
@@ -175,26 +193,26 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
* 1) the cdc infer case is [[LOG_FILE]];
* 2) the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is
'op_key'.
*/
- private val beforeImageRecords: mutable.Map[String, InternalRow] =
mutable.Map.empty
+ private val beforeImageRecords: mutable.Map[String,
BufferedRecord[InternalRow]] = mutable.Map.empty
/**
* Keep the after-image data. Only one case will use this:
* the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is
[[OP_KEY_ONLY]] or [[DATA_BEFORE]].
*/
- private val afterImageRecords: util.Map[String, InternalRow] = new
ExternalSpillableMap[String, InternalRow](
+ private val afterImageRecords: util.Map[String, BufferedRecord[InternalRow]]
= new ExternalSpillableMap[String, BufferedRecord[InternalRow]](
props.getLong(HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.key(),
HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.defaultValue()),
props.getString(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key,
FileIOUtils.getDefaultSpillableMapBasePath),
new DefaultSizeEstimator[String],
- new DefaultSizeEstimator[InternalRow],
+ new DefaultSizeEstimator[BufferedRecord[InternalRow]],
ExternalSpillableMap.DiskMapType.valueOf(props.getString(
SPILLABLE_DISK_MAP_TYPE.key(),
SPILLABLE_DISK_MAP_TYPE.defaultValue().toString)
.toUpperCase(Locale.ROOT)),
- new DefaultSerializer[InternalRow],
+ new DefaultSerializer[BufferedRecord[InternalRow]],
props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()),
getClass.getSimpleName)
- private val internalRowToJsonStringConverter = new
InternalRowToJsonStringConverter(originTableSchema)
+ private val internalRowToJsonStringConverterMap: mutable.Map[Integer,
InternalRowToJsonStringConverter] = mutable.Map.empty
private def needLoadNextFile: Boolean = {
!recordIter.hasNext &&
@@ -243,11 +261,11 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
currentCDCFileSplit.getCdcInferCase match {
case BASE_FILE_INSERT =>
val originRecord = recordIter.next()
- recordToLoad.update(3, convertRowToJsonString(originRecord))
+ recordToLoad.update(3, convertBufferedRecordToJsonString(originRecord))
loaded = true
case BASE_FILE_DELETE =>
val originRecord = recordIter.next()
- recordToLoad.update(2, convertRowToJsonString(originRecord))
+ recordToLoad.update(2, convertBufferedRecordToJsonString(originRecord))
loaded = true
case LOG_FILE =>
loaded = loadNextLogRecord()
@@ -269,9 +287,9 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
recordToLoad.update(2, recordToJsonAsUTF8String(before))
parse(op) match {
case INSERT =>
- recordToLoad.update(3,
convertRowToJsonString(afterImageRecords.get(recordKey)))
+ recordToLoad.update(3,
convertBufferedRecordToJsonString(afterImageRecords.get(recordKey)))
case UPDATE =>
- recordToLoad.update(3,
convertRowToJsonString(afterImageRecords.get(recordKey)))
+ recordToLoad.update(3,
convertBufferedRecordToJsonString(afterImageRecords.get(recordKey)))
case _ =>
recordToLoad.update(3, null)
}
@@ -283,19 +301,19 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
parse(op) match {
case INSERT =>
recordToLoad.update(2, null)
- recordToLoad.update(3,
convertRowToJsonString(afterImageRecords.get(recordKey)))
+ recordToLoad.update(3,
convertBufferedRecordToJsonString(afterImageRecords.get(recordKey)))
case UPDATE =>
- recordToLoad.update(2,
convertRowToJsonString(beforeImageRecords(recordKey)))
- recordToLoad.update(3,
convertRowToJsonString(afterImageRecords.get(recordKey)))
+ recordToLoad.update(2,
convertBufferedRecordToJsonString(beforeImageRecords(recordKey)))
+ recordToLoad.update(3,
convertBufferedRecordToJsonString(afterImageRecords.get(recordKey)))
case _ =>
- recordToLoad.update(2,
convertRowToJsonString(beforeImageRecords(recordKey)))
+ recordToLoad.update(2,
convertBufferedRecordToJsonString(beforeImageRecords(recordKey)))
recordToLoad.update(3, null)
}
}
loaded = true
case REPLACE_COMMIT =>
val originRecord = recordIter.next()
- recordToLoad.update(2, convertRowToJsonString(originRecord))
+ recordToLoad.update(2, convertBufferedRecordToJsonString(originRecord))
loaded = true
}
loaded
@@ -315,7 +333,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
} else {
// there is a real record deleted.
recordToLoad.update(0, CDCRelation.CDC_OPERATION_DELETE)
- recordToLoad.update(2, convertRowToJsonString(existingRecordOpt.get))
+ recordToLoad.update(2,
convertBufferedRecordToJsonString(existingRecordOpt.get))
recordToLoad.update(3, null)
loaded = true
}
@@ -325,18 +343,18 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
// a new record is inserted.
recordToLoad.update(0, CDCRelation.CDC_OPERATION_INSERT)
recordToLoad.update(2, null)
- recordToLoad.update(3, convertRowToJsonString(logRecord.getRecord))
+ recordToLoad.update(3, convertBufferedRecordToJsonString(logRecord))
// insert into beforeImageRecords
- beforeImageRecords(logRecord.getRecordKey) = logRecord.getRecord
+ beforeImageRecords(logRecord.getRecordKey) = logRecord
loaded = true
} else {
// a existed record is updated.
val existingRecord = existingRecordOpt.get
- val mergeRecord = merge(existingRecord,
logRecord.getRecord).getLeft.getData
+ val mergeRecord = merge(existingRecord, logRecord)
if (existingRecord != mergeRecord) {
recordToLoad.update(0, CDCRelation.CDC_OPERATION_UPDATE)
- recordToLoad.update(2, convertRowToJsonString(existingRecord))
- recordToLoad.update(3, convertRowToJsonString(mergeRecord))
+ recordToLoad.update(2,
convertBufferedRecordToJsonString(existingRecord))
+ recordToLoad.update(3,
convertBufferedRecordToJsonString(mergeRecord))
// update into beforeImageRecords
beforeImageRecords(logRecord.getRecordKey) = mergeRecord
loaded = true
@@ -350,6 +368,8 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
// reset all the iterator first.
recordIter = Iterator.empty
logRecordIter = Iterator.empty
+ keyBasedFileGroupRecordBuffer.ifPresent(k => k.close())
+ keyBasedFileGroupRecordBuffer =
Option.empty.asInstanceOf[Option[KeyBasedFileGroupRecordBuffer[InternalRow]]]
beforeImageRecords.clear()
afterImageRecords.clear()
if (cdcLogRecordIterator != null) {
@@ -371,6 +391,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
InternalRow.empty, absCDCPath, 0, fileStatus.getLength)
recordIter = baseFileReader.read(pf,
originTableSchema.structTypeSchema, new StructType(),
toJavaOption(originTableSchema.internalSchema), Seq.empty, conf)
+ .map(record => BufferedRecords.fromEngineRecord(record,
avroSchema, readerContext.getRecordContext, orderingFieldNames, false))
case BASE_FILE_DELETE =>
assert(currentCDCFileSplit.getBeforeFileSlice.isPresent)
recordIter =
loadFileSlice(currentCDCFileSplit.getBeforeFileSlice.get)
@@ -389,10 +410,10 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
}
// load afterFileSlice to afterImageRecords
if (currentCDCFileSplit.getAfterFileSlice.isPresent) {
- val iter =
loadFileSliceWithKeys(currentCDCFileSplit.getAfterFileSlice.get())
+ val iter =
loadFileSlice(currentCDCFileSplit.getAfterFileSlice.get())
afterImageRecords.clear()
- iter.foreach { tuple =>
- afterImageRecords.put(tuple._1, tuple._2)
+ iter.foreach { bufferedRecord =>
+ afterImageRecords.put(bufferedRecord.getRecordKey,
bufferedRecord)
}
}
@@ -453,9 +474,9 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
if (!same) {
// clear up the beforeImageRecords
beforeImageRecords.clear()
- val iter = loadFileSliceWithKeys(fileSlice)
- iter.foreach { tuple =>
- beforeImageRecords.put(tuple._1, tuple._2)
+ val iter = loadFileSlice(fileSlice)
+ iter.foreach { bufferedRecord =>
+ beforeImageRecords.put(bufferedRecord.getRecordKey, bufferedRecord)
}
// reset beforeImageFiles
beforeImageFiles.clear()
@@ -463,22 +484,11 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
}
}
- private def loadFileSliceWithKeys(fileSlice: FileSlice): Iterator[(String,
InternalRow)] = {
- val readerContext = new
SparkFileFormatInternalRowReaderContext(baseFileReader, Seq.empty, Seq.empty,
- conf, metaClient.getTableConfig)
- loadFileSlice(fileSlice, readerContext).map(internalRow => {
- val recordKey =
readerContext.getRecordContext().getRecordKey(internalRow, avroSchema)
- (recordKey, internalRow)
- })
- }
-
- private def loadFileSlice(fileSlice: FileSlice): Iterator[InternalRow] = {
- val readerContext = new
SparkFileFormatInternalRowReaderContext(baseFileReader, Seq.empty, Seq.empty,
- conf, metaClient.getTableConfig)
+ private def loadFileSlice(fileSlice: FileSlice):
Iterator[BufferedRecord[InternalRow]] = {
loadFileSlice(fileSlice, readerContext)
}
- private def loadFileSlice(fileSlice: FileSlice, readerContext:
SparkFileFormatInternalRowReaderContext): Iterator[InternalRow] = {
+ private def loadFileSlice(fileSlice: FileSlice, readerContext:
SparkFileFormatInternalRowReaderContext): Iterator[BufferedRecord[InternalRow]]
= {
val fileGroupReader = HoodieFileGroupReader.newBuilder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metaClient)
@@ -489,24 +499,22 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
.withProps(readerProperties)
.withLatestCommitTime(split.changes.last.getInstant)
.build()
-
CloseableIteratorListener.addListener(fileGroupReader.getClosableIterator).asScala
+
CloseableIteratorListener.addListener(fileGroupReader.getClosableBufferedRecordIterator).asScala
}
private def loadLogFile(logFile: HoodieLogFile, instant: String):
Iterator[BufferedRecord[InternalRow]] = {
val partitionPath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath,
logFile.getPath.getParent)
- val readerContext = new
SparkFileFormatInternalRowReaderContext(baseFileReader, Seq.empty, Seq.empty,
- conf, metaClient.getTableConfig)
readerContext.setLatestCommitTime(instant)
readerContext.setHasBootstrapBaseFile(false)
readerContext.setHasLogFiles(true)
- readerContext.initRecordMerger(readerProperties)
readerContext.setSchemaHandler(
new FileGroupReaderSchemaHandler[InternalRow](readerContext, avroSchema,
avroSchema,
Option.empty(), metaClient.getTableConfig, readerProperties))
val stats = new HoodieReadStats
- val recordBuffer = new
KeyBasedFileGroupRecordBuffer[InternalRow](readerContext, metaClient,
readerContext.getMergeMode,
+ keyBasedFileGroupRecordBuffer.ifPresent(k => k.close())
+ keyBasedFileGroupRecordBuffer = Option.of(new
KeyBasedFileGroupRecordBuffer[InternalRow](readerContext, metaClient,
readerContext.getMergeMode,
metaClient.getTableConfig.getPartialUpdateMode, readerProperties,
metaClient.getTableConfig.getPreCombineFields,
- UpdateProcessor.create(stats, readerContext, true, Option.empty()))
+ UpdateProcessor.create(stats, readerContext, true, Option.empty())))
HoodieMergedLogRecordReader.newBuilder[InternalRow]
.withStorage(metaClient.getStorage)
@@ -516,17 +524,19 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
.withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue)
.withPartition(partitionPath)
.withMetaClient(metaClient)
- .withRecordBuffer(recordBuffer)
+ .withRecordBuffer(keyBasedFileGroupRecordBuffer.get())
.build
-
CloseableIteratorListener.addListener(recordBuffer.getLogRecordIterator).asScala
+
CloseableIteratorListener.addListener(keyBasedFileGroupRecordBuffer.get().getLogRecordIterator).asScala
}
/**
* Convert InternalRow to json string.
*/
- private def convertRowToJsonString(record: InternalRow): UTF8String = {
- internalRowToJsonStringConverter.convert(record)
+ private def convertBufferedRecordToJsonString(record:
BufferedRecord[InternalRow]): UTF8String = {
+ internalRowToJsonStringConverterMap.getOrElseUpdate(record.getSchemaId,
+ new
InternalRowToJsonStringConverter(HoodieInternalRowUtils.getCachedSchema(readerContext.getRecordContext.decodeAvroSchema(record.getSchemaId))))
+ .convert(record.getRecord)
}
/**
@@ -540,15 +550,24 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
convertToUTF8String(HoodieCDCUtils.recordToJson(record))
}
- private def merge(currentRecord: InternalRow, newRecord: InternalRow):
ImmutablePair[HoodieSparkRecord, Schema] = {
- recordMerger.merge(new HoodieSparkRecord(currentRecord, structTypeSchema),
avroSchema,
- new HoodieSparkRecord(newRecord, structTypeSchema), avroSchema, props)
- .get().asInstanceOf[ImmutablePair[HoodieSparkRecord, Schema]]
+ private def merge(currentRecord: BufferedRecord[InternalRow], newRecord:
BufferedRecord[InternalRow]): BufferedRecord[InternalRow] = {
+ if (!isPartialMergeEnabled && keyBasedFileGroupRecordBuffer.isPresent &&
keyBasedFileGroupRecordBuffer.get().isPartialMergingEnabled) {
+ isPartialMergeEnabled = true
+ bufferedRecordMerger = getBufferedRecordMerger
+ }
+ val deltaMergeResult = bufferedRecordMerger.deltaMerge(newRecord,
currentRecord)
+ if (deltaMergeResult.isEmpty) {
+ currentRecord
+ } else {
+ deltaMergeResult.get()
+ }
}
override def close(): Unit = {
recordIter = Iterator.empty
logRecordIter = Iterator.empty
+ keyBasedFileGroupRecordBuffer.ifPresent(k => k.close())
+ keyBasedFileGroupRecordBuffer =
Option.empty.asInstanceOf[Option[KeyBasedFileGroupRecordBuffer[InternalRow]]]
beforeImageRecords.clear()
afterImageRecords.clear()
if (cdcLogRecordIterator != null) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 0f5f90d49efc..edb5ea7ff10a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -238,7 +238,7 @@ class HoodieCDCRDD(
*/
private var afterImageRecords: mutable.Map[String, InternalRow] =
mutable.Map.empty
- private var internalRowToJsonStringConverter = new
InternalRowToJsonStringConverter(originTableSchema)
+ private var internalRowToJsonStringConverter = new
InternalRowToJsonStringConverter(originTableSchema.structTypeSchema)
private def needLoadNextFile: Boolean = {
!recordIter.hasNext &&
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/InternalRowToJsonStringConverter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/InternalRowToJsonStringConverter.scala
index 2bdb07418c9f..62dd08bebe00 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/InternalRowToJsonStringConverter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/InternalRowToJsonStringConverter.scala
@@ -18,16 +18,14 @@
package org.apache.hudi.cdc
-import org.apache.hudi.HoodieTableSchema
-
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String
-class InternalRowToJsonStringConverter(originTableSchema: HoodieTableSchema) {
+class InternalRowToJsonStringConverter(schema: StructType) {
private lazy val mapper: ObjectMapper = {
val _mapper = new ObjectMapper
@@ -39,7 +37,7 @@ class InternalRowToJsonStringConverter(originTableSchema:
HoodieTableSchema) {
def convert(record: InternalRow): UTF8String = {
val map = scala.collection.mutable.Map.empty[String, Any]
- originTableSchema.structTypeSchema.zipWithIndex.foreach {
+ schema.zipWithIndex.foreach {
case (field, idx) =>
if (field.dataType.isInstanceOf[StringType]) {
map(field.name) =
Option(record.getUTF8String(idx)).map(_.toString).orNull
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/cdc/TestInternalRowToJsonStringConverter.scala
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/cdc/TestInternalRowToJsonStringConverter.scala
index 8104b5470b70..90ed213278af 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/cdc/TestInternalRowToJsonStringConverter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/cdc/TestInternalRowToJsonStringConverter.scala
@@ -28,11 +28,11 @@ import org.junit.jupiter.api.Assertions.{assertEquals,
assertTrue}
import org.junit.jupiter.api.Test
class TestInternalRowToJsonStringConverter {
- private val converter = new
InternalRowToJsonStringConverter(hoodieTableSchema)
+ private val converter = new
InternalRowToJsonStringConverter(hoodieTableSchema.structTypeSchema)
@Test
def emptyRow(): Unit = {
- val converter = new
InternalRowToJsonStringConverter(emptyHoodieTableSchema)
+ val converter = new
InternalRowToJsonStringConverter(emptyHoodieTableSchema.structTypeSchema)
val row = InternalRow.empty
val converted = converter.convert(row)
assertEquals("{}", converted.toString)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
index 31e101302880..7c6304dfa49f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
@@ -20,8 +20,11 @@
package org.apache.spark.sql.hudi.feature
import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION
+import org.apache.hudi.DataSourceWriteOptions
+import
org.apache.hudi.DataSourceWriteOptions.{ENABLE_MERGE_INTO_PARTIAL_UPDATES,
SPARK_SQL_INSERT_INTO_OPERATION}
+import org.apache.hudi.common.config.HoodieReaderConfig
import
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.{DATA_BEFORE,
DATA_BEFORE_AFTER, OP_KEY_ONLY}
+import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import org.apache.spark.sql.DataFrame
@@ -304,4 +307,85 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
}
}
}
+
+ test("Test Partial Updates With Spark CDC") {
+ val databaseName = "hudi_database"
+ spark.sql(s"create database if not exists $databaseName")
+ spark.sql(s"use $databaseName")
+ withSQLConf(HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key
-> "0",
+ DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> "true",
+ HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> "true") {
+ Seq(OP_KEY_ONLY, DATA_BEFORE, DATA_BEFORE_AFTER).foreach { loggingMode =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | pt string
+ | ) using hudi
+ | partitioned by (pt)
+ | tblproperties (
+ | 'primaryKey' = 'id',
+ | 'preCombineField' = 'ts',
+ | 'hoodie.table.cdc.enabled' = 'true',
+ | 'hoodie.table.cdc.supplemental.logging.mode' =
'${loggingMode.name()}',
+ | '${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key}' = 'true',
+ | 'type' = 'mor'
+ | )
+ | location '$basePath'
+ """.stripMargin)
+
+ val metaClient = createMetaClient(spark, basePath)
+
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1', 11, 1000, '2021'),
+ | (2, 'a2', 12, 1000, '2022'),
+ | (3, 'a3', 13, 1000, '2022')
+ """.stripMargin)
+ val commitTime1 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
+ val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
+ cdcDataOnly1.show(false)
+ assertCDCOpCnt(cdcDataOnly1, 3, 0, 0)
+
+ spark.sql(s"insert overwrite table $tableName partition (pt =
'2021') values (1, 'a1_v2', 11, 1100)")
+ val commitTime2 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
+ val cdcDataOnly2 = cdcDataFrame(basePath, commitTime2.toLong - 1)
+ cdcDataOnly2.show(false)
+ assertCDCOpCnt(cdcDataOnly2, 1, 0, 1)
+
+ spark.sql(s"update $tableName set name = 'a2_v2', ts = 1200 where id
= 2")
+ val commitTime3 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
+ val cdcDataOnly3 = cdcDataFrame(basePath, commitTime3.toLong - 1)
+ cdcDataOnly3.show(false)
+ assertCDCOpCnt(cdcDataOnly3, 0, 1, 0)
+
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select * from (
+ | select 1 as id, 'a1_v3' as name, cast(1300 as long) as ts,
"2021" as pt
+ | )
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched then update set id = s0.id, name = s0.name, ts =
s0.ts, pt = s0.pt
+ """.stripMargin)
+ val commitTime4 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
+ val cdcDataOnly4 = cdcDataFrame(basePath, commitTime4.toLong - 1)
+ cdcDataOnly4.show(false)
+ assertCDCOpCnt(cdcDataOnly4, 0, 1, 0)
+
+ val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1)
+ assertCDCOpCnt(totalCdcData, 4, 2, 1)
+ }
+ }
+ }
+ }
}