This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aa92cb32038 [HUDI-9695] Use BufferedRecordMerger for spark cdc reader 
(#13694)
7aa92cb32038 is described below

commit 7aa92cb32038c4eeda80868431468d510e837f42
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Aug 13 03:11:00 2025 -0400

    [HUDI-9695] Use BufferedRecordMerger for spark cdc reader (#13694)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../common/table/read/HoodieFileGroupReader.java   |  16 ++-
 .../read/buffer/KeyBasedFileGroupRecordBuffer.java |   4 +
 .../org/apache/hudi/cdc/CDCFileGroupIterator.scala | 149 ++++++++++++---------
 .../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala   |   2 +-
 .../cdc/InternalRowToJsonStringConverter.scala     |   8 +-
 .../cdc/TestInternalRowToJsonStringConverter.scala |   4 +-
 .../sql/hudi/feature/TestCDCForSparkSQL.scala      |  86 +++++++++++-
 7 files changed, 188 insertions(+), 81 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index d4d7b64b1ee2..5615374c1313 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -280,22 +280,25 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     }
   }
 
-  private ClosableIterator<BufferedRecord<T>> getBufferedRecordIterator() 
throws IOException {
+  private ClosableIterator<BufferedRecord<T>> 
getBufferedRecordIterator(IteratorMode iteratorMode) throws IOException {
+    this.readerContext.setIteratorMode(iteratorMode);
     initRecordIterators();
     return new HoodieFileGroupReaderIterator<>(this);
   }
 
+  public ClosableIterator<BufferedRecord<T>> 
getClosableBufferedRecordIterator() throws IOException {
+    return getBufferedRecordIterator(IteratorMode.HOODIE_RECORD);
+  }
+
   public ClosableIterator<T> getClosableIterator() throws IOException {
-    this.readerContext.setIteratorMode(IteratorMode.ENGINE_RECORD);
-    return new CloseableMappingIterator<>(getBufferedRecordIterator(), 
BufferedRecord::getRecord);
+    return new 
CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.ENGINE_RECORD),
 BufferedRecord::getRecord);
   }
 
   /**
    * @return An iterator over the records that wraps the engine-specific 
record in a HoodieRecord.
    */
   public ClosableIterator<HoodieRecord<T>> getClosableHoodieRecordIterator() 
throws IOException {
-    this.readerContext.setIteratorMode(IteratorMode.HOODIE_RECORD);
-    return new CloseableMappingIterator<>(getBufferedRecordIterator(),
+    return new 
CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.HOODIE_RECORD),
         bufferedRecord -> 
readerContext.getRecordContext().constructHoodieRecord(bufferedRecord));
   }
 
@@ -303,8 +306,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
    * @return A record key iterator over the records.
    */
   public ClosableIterator<String> getClosableKeyIterator() throws IOException {
-    this.readerContext.setIteratorMode(IteratorMode.RECORD_KEY);
-    return new CloseableMappingIterator<>(getBufferedRecordIterator(), 
BufferedRecord::getRecordKey);
+    return new 
CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.RECORD_KEY), 
BufferedRecord::getRecordKey);
   }
 
   public ClosableIterator<BufferedRecord<T>> getLogRecordsOnly() throws 
IOException {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
index 75fc3b7fd024..8fe84cbc2042 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java
@@ -156,4 +156,8 @@ public class KeyBasedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
     // Handle records solely from log files.
     return hasNextLogRecord();
   }
+
+  public boolean isPartialMergingEnabled() {
+    return enablePartialMerging;
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index 99e3fb23b594..18cfbf05fe9d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -26,20 +26,20 @@ import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig, 
HoodieMetadataConfig, HoodieReaderConfig, TypedProperties}
 import 
org.apache.hudi.common.config.HoodieCommonConfig.{DISK_MAP_BITCASK_COMPRESSION_ENABLED,
 SPILLABLE_DISK_MAP_TYPE}
 import org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH
-import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, 
HoodieRecordMerger, HoodieSparkRecord}
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, 
HoodieRecordMerger}
+import 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID
 import org.apache.hudi.common.serialization.DefaultSerializer
-import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.{HoodieTableMetaClient, PartialUpdateMode}
 import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils}
 import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase._
 import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
 import org.apache.hudi.common.table.log.{HoodieCDCLogRecordIterator, 
HoodieMergedLogRecordReader}
-import org.apache.hudi.common.table.read.{BufferedRecord, 
FileGroupReaderSchemaHandler, HoodieFileGroupReader, HoodieReadStats, 
UpdateProcessor}
+import org.apache.hudi.common.table.read.{BufferedRecord, 
BufferedRecordMerger, BufferedRecordMergerFactory, BufferedRecords, 
FileGroupReaderSchemaHandler, HoodieFileGroupReader, HoodieReadStats, 
IteratorMode, UpdateProcessor}
 import org.apache.hudi.common.table.read.buffer.KeyBasedFileGroupRecordBuffer
-import org.apache.hudi.common.util.{DefaultSizeEstimator, FileIOUtils, Option}
-import org.apache.hudi.common.util.collection.{ExternalSpillableMap, 
ImmutablePair}
+import org.apache.hudi.common.util.{DefaultSizeEstimator, FileIOUtils, 
HoodieRecordUtils, Option}
+import org.apache.hudi.common.util.collection.ExternalSpillableMap
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.data.CloseableIteratorListener
 import org.apache.hudi.storage.{StorageConfiguration, StoragePath}
@@ -48,6 +48,7 @@ import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.conf.Configuration
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.HoodieInternalRowUtils
 import org.apache.spark.sql.avro.HoodieAvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Projection
@@ -75,6 +76,26 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
   extends Iterator[InternalRow]
   with SparkAdapterSupport with AvroDeserializerSupport with Closeable {
 
+  private lazy val readerContext = {
+    val bufferedReaderContext = new 
SparkFileFormatInternalRowReaderContext(baseFileReader,
+      Seq.empty, Seq.empty, conf, metaClient.getTableConfig)
+    bufferedReaderContext.initRecordMerger(readerProperties)
+    bufferedReaderContext
+  }
+
+  private lazy val orderingFieldNames = 
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode, props, 
metaClient)
+  private lazy val payloadClass: Option[String] = if 
(recordMerger.getMergingStrategy == PAYLOAD_BASED_MERGE_STRATEGY_UUID) {
+    Option.of(metaClient.getTableConfig.getPayloadClass)
+  } else {
+    Option.empty.asInstanceOf[Option[String]]
+  }
+  private lazy val partialUpdateMode: PartialUpdateMode = 
metaClient.getTableConfig.getPartialUpdateMode
+  private var isPartialMergeEnabled = false
+  private var bufferedRecordMerger = getBufferedRecordMerger
+  private def getBufferedRecordMerger: BufferedRecordMerger[InternalRow] = 
BufferedRecordMergerFactory.create(readerContext,
+    readerContext.getMergeMode, isPartialMergeEnabled, 
Option.of(recordMerger), orderingFieldNames,
+    payloadClass, avroSchema, props, partialUpdateMode)
+
   private lazy val storage = metaClient.getStorage
 
   private lazy val basePath = metaClient.getBasePath
@@ -100,11 +121,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
     readerProps
   }
 
-  private lazy val recordMerger: HoodieRecordMerger = {
-    val readerContext: HoodieReaderContext[InternalRow] = new 
SparkFileFormatInternalRowReaderContext(baseFileReader, Seq.empty, Seq.empty, 
conf, metaClient.getTableConfig)
-    readerContext.initRecordMerger(props)
-    readerContext.getRecordMerger.get()
-  }
+  private lazy val recordMerger: HoodieRecordMerger = 
readerContext.getRecordMerger().get()
 
   protected override val avroSchema: Schema = new 
Schema.Parser().parse(originTableSchema.avroSchemaStr)
 
@@ -145,13 +162,14 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
    * 2) when the type of cdc file is 'REPLACE_COMMIT',
    * use this to trace the records that are converted from the 
'[[beforeImageRecords]]
    */
-  private var recordIter: Iterator[InternalRow] = Iterator.empty
+  private var recordIter: Iterator[BufferedRecord[InternalRow]] = 
Iterator.empty
 
   /**
    * Only one case where it will be used is that extract the change data from 
log files for mor table.
    * At the time, 'logRecordIter' will work with [[beforeImageRecords]] that 
keep all the records of the previous file slice.
    */
   private var logRecordIter: Iterator[BufferedRecord[InternalRow]] = 
Iterator.empty
+  private var keyBasedFileGroupRecordBuffer: 
Option[KeyBasedFileGroupRecordBuffer[InternalRow]] = 
Option.empty.asInstanceOf[Option[KeyBasedFileGroupRecordBuffer[InternalRow]]]
 
   /**
    * Only one case where it will be used is that extract the change data from 
cdc log files.
@@ -175,26 +193,26 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
    * 1) the cdc infer case is [[LOG_FILE]];
    * 2) the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is 
'op_key'.
    */
-  private val beforeImageRecords: mutable.Map[String, InternalRow] = 
mutable.Map.empty
+  private val beforeImageRecords: mutable.Map[String, 
BufferedRecord[InternalRow]] = mutable.Map.empty
 
   /**
    * Keep the after-image data. Only one case will use this:
    * the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is 
[[OP_KEY_ONLY]] or [[DATA_BEFORE]].
    */
-  private val afterImageRecords: util.Map[String, InternalRow] = new 
ExternalSpillableMap[String, InternalRow](
+  private val afterImageRecords: util.Map[String, BufferedRecord[InternalRow]] 
= new ExternalSpillableMap[String, BufferedRecord[InternalRow]](
     
props.getLong(HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.key(),
       
HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.defaultValue()),
     props.getString(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key, 
FileIOUtils.getDefaultSpillableMapBasePath),
     new DefaultSizeEstimator[String],
-    new DefaultSizeEstimator[InternalRow],
+    new DefaultSizeEstimator[BufferedRecord[InternalRow]],
     ExternalSpillableMap.DiskMapType.valueOf(props.getString(
       SPILLABLE_DISK_MAP_TYPE.key(), 
SPILLABLE_DISK_MAP_TYPE.defaultValue().toString)
       .toUpperCase(Locale.ROOT)),
-    new DefaultSerializer[InternalRow],
+    new DefaultSerializer[BufferedRecord[InternalRow]],
     props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), 
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()),
     getClass.getSimpleName)
 
-  private val internalRowToJsonStringConverter = new 
InternalRowToJsonStringConverter(originTableSchema)
+  private val internalRowToJsonStringConverterMap: mutable.Map[Integer, 
InternalRowToJsonStringConverter] = mutable.Map.empty
 
   private def needLoadNextFile: Boolean = {
     !recordIter.hasNext &&
@@ -243,11 +261,11 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
     currentCDCFileSplit.getCdcInferCase match {
       case BASE_FILE_INSERT =>
         val originRecord = recordIter.next()
-        recordToLoad.update(3, convertRowToJsonString(originRecord))
+        recordToLoad.update(3, convertBufferedRecordToJsonString(originRecord))
         loaded = true
       case BASE_FILE_DELETE =>
         val originRecord = recordIter.next()
-        recordToLoad.update(2, convertRowToJsonString(originRecord))
+        recordToLoad.update(2, convertBufferedRecordToJsonString(originRecord))
         loaded = true
       case LOG_FILE =>
         loaded = loadNextLogRecord()
@@ -269,9 +287,9 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
             recordToLoad.update(2, recordToJsonAsUTF8String(before))
             parse(op) match {
               case INSERT =>
-                recordToLoad.update(3, 
convertRowToJsonString(afterImageRecords.get(recordKey)))
+                recordToLoad.update(3, 
convertBufferedRecordToJsonString(afterImageRecords.get(recordKey)))
               case UPDATE =>
-                recordToLoad.update(3, 
convertRowToJsonString(afterImageRecords.get(recordKey)))
+                recordToLoad.update(3, 
convertBufferedRecordToJsonString(afterImageRecords.get(recordKey)))
               case _ =>
                 recordToLoad.update(3, null)
             }
@@ -283,19 +301,19 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
             parse(op) match {
               case INSERT =>
                 recordToLoad.update(2, null)
-                recordToLoad.update(3, 
convertRowToJsonString(afterImageRecords.get(recordKey)))
+                recordToLoad.update(3, 
convertBufferedRecordToJsonString(afterImageRecords.get(recordKey)))
               case UPDATE =>
-                recordToLoad.update(2, 
convertRowToJsonString(beforeImageRecords(recordKey)))
-                recordToLoad.update(3, 
convertRowToJsonString(afterImageRecords.get(recordKey)))
+                recordToLoad.update(2, 
convertBufferedRecordToJsonString(beforeImageRecords(recordKey)))
+                recordToLoad.update(3, 
convertBufferedRecordToJsonString(afterImageRecords.get(recordKey)))
               case _ =>
-                recordToLoad.update(2, 
convertRowToJsonString(beforeImageRecords(recordKey)))
+                recordToLoad.update(2, 
convertBufferedRecordToJsonString(beforeImageRecords(recordKey)))
                 recordToLoad.update(3, null)
             }
         }
         loaded = true
       case REPLACE_COMMIT =>
         val originRecord = recordIter.next()
-        recordToLoad.update(2, convertRowToJsonString(originRecord))
+        recordToLoad.update(2, convertBufferedRecordToJsonString(originRecord))
         loaded = true
     }
     loaded
@@ -315,7 +333,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
       } else {
         // there is a real record deleted.
         recordToLoad.update(0, CDCRelation.CDC_OPERATION_DELETE)
-        recordToLoad.update(2, convertRowToJsonString(existingRecordOpt.get))
+        recordToLoad.update(2, 
convertBufferedRecordToJsonString(existingRecordOpt.get))
         recordToLoad.update(3, null)
         loaded = true
       }
@@ -325,18 +343,18 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
         // a new record is inserted.
         recordToLoad.update(0, CDCRelation.CDC_OPERATION_INSERT)
         recordToLoad.update(2, null)
-        recordToLoad.update(3, convertRowToJsonString(logRecord.getRecord))
+        recordToLoad.update(3, convertBufferedRecordToJsonString(logRecord))
         // insert into beforeImageRecords
-        beforeImageRecords(logRecord.getRecordKey) = logRecord.getRecord
+        beforeImageRecords(logRecord.getRecordKey) = logRecord
         loaded = true
       } else {
         // a existed record is updated.
         val existingRecord = existingRecordOpt.get
-        val mergeRecord = merge(existingRecord, 
logRecord.getRecord).getLeft.getData
+        val mergeRecord = merge(existingRecord, logRecord)
         if (existingRecord != mergeRecord) {
           recordToLoad.update(0, CDCRelation.CDC_OPERATION_UPDATE)
-          recordToLoad.update(2, convertRowToJsonString(existingRecord))
-          recordToLoad.update(3, convertRowToJsonString(mergeRecord))
+          recordToLoad.update(2, 
convertBufferedRecordToJsonString(existingRecord))
+          recordToLoad.update(3, 
convertBufferedRecordToJsonString(mergeRecord))
           // update into beforeImageRecords
           beforeImageRecords(logRecord.getRecordKey) = mergeRecord
           loaded = true
@@ -350,6 +368,8 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
     // reset all the iterator first.
     recordIter = Iterator.empty
     logRecordIter = Iterator.empty
+    keyBasedFileGroupRecordBuffer.ifPresent(k => k.close())
+    keyBasedFileGroupRecordBuffer = 
Option.empty.asInstanceOf[Option[KeyBasedFileGroupRecordBuffer[InternalRow]]]
     beforeImageRecords.clear()
     afterImageRecords.clear()
     if (cdcLogRecordIterator != null) {
@@ -371,6 +391,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
             InternalRow.empty, absCDCPath, 0, fileStatus.getLength)
           recordIter = baseFileReader.read(pf, 
originTableSchema.structTypeSchema, new StructType(),
             toJavaOption(originTableSchema.internalSchema), Seq.empty, conf)
+            .map(record => BufferedRecords.fromEngineRecord(record, 
avroSchema, readerContext.getRecordContext, orderingFieldNames, false))
         case BASE_FILE_DELETE =>
           assert(currentCDCFileSplit.getBeforeFileSlice.isPresent)
           recordIter = 
loadFileSlice(currentCDCFileSplit.getBeforeFileSlice.get)
@@ -389,10 +410,10 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
           }
           // load afterFileSlice to afterImageRecords
           if (currentCDCFileSplit.getAfterFileSlice.isPresent) {
-            val iter = 
loadFileSliceWithKeys(currentCDCFileSplit.getAfterFileSlice.get())
+            val iter = 
loadFileSlice(currentCDCFileSplit.getAfterFileSlice.get())
             afterImageRecords.clear()
-            iter.foreach { tuple =>
-              afterImageRecords.put(tuple._1, tuple._2)
+            iter.foreach { bufferedRecord =>
+              afterImageRecords.put(bufferedRecord.getRecordKey, 
bufferedRecord)
             }
           }
 
@@ -453,9 +474,9 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
     if (!same) {
       // clear up the beforeImageRecords
       beforeImageRecords.clear()
-      val iter = loadFileSliceWithKeys(fileSlice)
-      iter.foreach { tuple =>
-        beforeImageRecords.put(tuple._1, tuple._2)
+      val iter = loadFileSlice(fileSlice)
+      iter.foreach { bufferedRecord =>
+        beforeImageRecords.put(bufferedRecord.getRecordKey, bufferedRecord)
       }
       // reset beforeImageFiles
       beforeImageFiles.clear()
@@ -463,22 +484,11 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
     }
   }
 
-  private def loadFileSliceWithKeys(fileSlice: FileSlice): Iterator[(String, 
InternalRow)] = {
-    val readerContext = new 
SparkFileFormatInternalRowReaderContext(baseFileReader, Seq.empty, Seq.empty,
-      conf, metaClient.getTableConfig)
-    loadFileSlice(fileSlice, readerContext).map(internalRow => {
-      val recordKey = 
readerContext.getRecordContext().getRecordKey(internalRow, avroSchema)
-      (recordKey, internalRow)
-    })
-  }
-
-  private def loadFileSlice(fileSlice: FileSlice): Iterator[InternalRow] = {
-    val readerContext = new 
SparkFileFormatInternalRowReaderContext(baseFileReader, Seq.empty, Seq.empty,
-      conf, metaClient.getTableConfig)
+  private def loadFileSlice(fileSlice: FileSlice): 
Iterator[BufferedRecord[InternalRow]] = {
     loadFileSlice(fileSlice, readerContext)
   }
 
-  private def loadFileSlice(fileSlice: FileSlice, readerContext: 
SparkFileFormatInternalRowReaderContext): Iterator[InternalRow] = {
+  private def loadFileSlice(fileSlice: FileSlice, readerContext: 
SparkFileFormatInternalRowReaderContext): Iterator[BufferedRecord[InternalRow]] 
= {
     val fileGroupReader = HoodieFileGroupReader.newBuilder()
       .withReaderContext(readerContext)
       .withHoodieTableMetaClient(metaClient)
@@ -489,24 +499,22 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
       .withProps(readerProperties)
       .withLatestCommitTime(split.changes.last.getInstant)
       .build()
-    
CloseableIteratorListener.addListener(fileGroupReader.getClosableIterator).asScala
+    
CloseableIteratorListener.addListener(fileGroupReader.getClosableBufferedRecordIterator).asScala
   }
 
   private def loadLogFile(logFile: HoodieLogFile, instant: String): 
Iterator[BufferedRecord[InternalRow]] = {
     val partitionPath = 
FSUtils.getRelativePartitionPath(metaClient.getBasePath, 
logFile.getPath.getParent)
-    val readerContext = new 
SparkFileFormatInternalRowReaderContext(baseFileReader, Seq.empty, Seq.empty,
-      conf, metaClient.getTableConfig)
     readerContext.setLatestCommitTime(instant)
     readerContext.setHasBootstrapBaseFile(false)
     readerContext.setHasLogFiles(true)
-    readerContext.initRecordMerger(readerProperties)
     readerContext.setSchemaHandler(
       new FileGroupReaderSchemaHandler[InternalRow](readerContext, avroSchema, 
avroSchema,
         Option.empty(), metaClient.getTableConfig, readerProperties))
     val stats = new HoodieReadStats
-    val recordBuffer = new 
KeyBasedFileGroupRecordBuffer[InternalRow](readerContext, metaClient, 
readerContext.getMergeMode,
+    keyBasedFileGroupRecordBuffer.ifPresent(k => k.close())
+    keyBasedFileGroupRecordBuffer = Option.of(new 
KeyBasedFileGroupRecordBuffer[InternalRow](readerContext, metaClient, 
readerContext.getMergeMode,
       metaClient.getTableConfig.getPartialUpdateMode, readerProperties, 
metaClient.getTableConfig.getPreCombineFields,
-      UpdateProcessor.create(stats, readerContext, true, Option.empty()))
+      UpdateProcessor.create(stats, readerContext, true, Option.empty())))
 
     HoodieMergedLogRecordReader.newBuilder[InternalRow]
       .withStorage(metaClient.getStorage)
@@ -516,17 +524,19 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
       
.withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue)
       .withPartition(partitionPath)
       .withMetaClient(metaClient)
-      .withRecordBuffer(recordBuffer)
+      .withRecordBuffer(keyBasedFileGroupRecordBuffer.get())
       .build
 
-    
CloseableIteratorListener.addListener(recordBuffer.getLogRecordIterator).asScala
+    
CloseableIteratorListener.addListener(keyBasedFileGroupRecordBuffer.get().getLogRecordIterator).asScala
   }
 
   /**
    * Convert InternalRow to json string.
    */
-  private def convertRowToJsonString(record: InternalRow): UTF8String = {
-    internalRowToJsonStringConverter.convert(record)
+  private def convertBufferedRecordToJsonString(record: 
BufferedRecord[InternalRow]): UTF8String = {
+    internalRowToJsonStringConverterMap.getOrElseUpdate(record.getSchemaId,
+      new 
InternalRowToJsonStringConverter(HoodieInternalRowUtils.getCachedSchema(readerContext.getRecordContext.decodeAvroSchema(record.getSchemaId))))
+      .convert(record.getRecord)
   }
 
   /**
@@ -540,15 +550,24 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
     convertToUTF8String(HoodieCDCUtils.recordToJson(record))
   }
 
-  private def merge(currentRecord: InternalRow, newRecord: InternalRow): 
ImmutablePair[HoodieSparkRecord, Schema] = {
-    recordMerger.merge(new HoodieSparkRecord(currentRecord, structTypeSchema), 
avroSchema,
-      new HoodieSparkRecord(newRecord, structTypeSchema), avroSchema, props)
-      .get().asInstanceOf[ImmutablePair[HoodieSparkRecord, Schema]]
+  private def merge(currentRecord: BufferedRecord[InternalRow], newRecord: 
BufferedRecord[InternalRow]): BufferedRecord[InternalRow] = {
+    if (!isPartialMergeEnabled && keyBasedFileGroupRecordBuffer.isPresent && 
keyBasedFileGroupRecordBuffer.get().isPartialMergingEnabled) {
+      isPartialMergeEnabled = true
+      bufferedRecordMerger = getBufferedRecordMerger
+    }
+    val deltaMergeResult = bufferedRecordMerger.deltaMerge(newRecord, 
currentRecord)
+    if (deltaMergeResult.isEmpty) {
+      currentRecord
+    } else {
+      deltaMergeResult.get()
+    }
   }
 
   override def close(): Unit = {
     recordIter = Iterator.empty
     logRecordIter = Iterator.empty
+    keyBasedFileGroupRecordBuffer.ifPresent(k => k.close())
+    keyBasedFileGroupRecordBuffer = 
Option.empty.asInstanceOf[Option[KeyBasedFileGroupRecordBuffer[InternalRow]]]
     beforeImageRecords.clear()
     afterImageRecords.clear()
     if (cdcLogRecordIterator != null) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 0f5f90d49efc..edb5ea7ff10a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -238,7 +238,7 @@ class HoodieCDCRDD(
      */
     private var afterImageRecords: mutable.Map[String, InternalRow] = 
mutable.Map.empty
 
-    private var internalRowToJsonStringConverter = new 
InternalRowToJsonStringConverter(originTableSchema)
+    private var internalRowToJsonStringConverter = new 
InternalRowToJsonStringConverter(originTableSchema.structTypeSchema)
 
     private def needLoadNextFile: Boolean = {
       !recordIter.hasNext &&
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/InternalRowToJsonStringConverter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/InternalRowToJsonStringConverter.scala
index 2bdb07418c9f..62dd08bebe00 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/InternalRowToJsonStringConverter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/InternalRowToJsonStringConverter.scala
@@ -18,16 +18,14 @@
 
 package org.apache.hudi.cdc
 
-import org.apache.hudi.HoodieTableSchema
-
 import com.fasterxml.jackson.annotation.JsonInclude.Include
 import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
-class InternalRowToJsonStringConverter(originTableSchema: HoodieTableSchema) {
+class InternalRowToJsonStringConverter(schema: StructType) {
 
   private lazy val mapper: ObjectMapper = {
     val _mapper = new ObjectMapper
@@ -39,7 +37,7 @@ class InternalRowToJsonStringConverter(originTableSchema: 
HoodieTableSchema) {
 
   def convert(record: InternalRow): UTF8String = {
     val map = scala.collection.mutable.Map.empty[String, Any]
-    originTableSchema.structTypeSchema.zipWithIndex.foreach {
+    schema.zipWithIndex.foreach {
       case (field, idx) =>
         if (field.dataType.isInstanceOf[StringType]) {
           map(field.name) = 
Option(record.getUTF8String(idx)).map(_.toString).orNull
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/cdc/TestInternalRowToJsonStringConverter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/cdc/TestInternalRowToJsonStringConverter.scala
index 8104b5470b70..90ed213278af 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/cdc/TestInternalRowToJsonStringConverter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/cdc/TestInternalRowToJsonStringConverter.scala
@@ -28,11 +28,11 @@ import org.junit.jupiter.api.Assertions.{assertEquals, 
assertTrue}
 import org.junit.jupiter.api.Test
 
 class TestInternalRowToJsonStringConverter {
-  private val converter = new 
InternalRowToJsonStringConverter(hoodieTableSchema)
+  private val converter = new 
InternalRowToJsonStringConverter(hoodieTableSchema.structTypeSchema)
 
   @Test
   def emptyRow(): Unit = {
-    val converter = new 
InternalRowToJsonStringConverter(emptyHoodieTableSchema)
+    val converter = new 
InternalRowToJsonStringConverter(emptyHoodieTableSchema.structTypeSchema)
     val row = InternalRow.empty
     val converted = converter.convert(row)
     assertEquals("{}", converted.toString)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
index 31e101302880..7c6304dfa49f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
@@ -20,8 +20,11 @@
 package org.apache.spark.sql.hudi.feature
 
 import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION
+import org.apache.hudi.DataSourceWriteOptions
+import 
org.apache.hudi.DataSourceWriteOptions.{ENABLE_MERGE_INTO_PARTIAL_UPDATES, 
SPARK_SQL_INSERT_INTO_OPERATION}
+import org.apache.hudi.common.config.HoodieReaderConfig
 import 
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.{DATA_BEFORE, 
DATA_BEFORE_AFTER, OP_KEY_ONLY}
+import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 
 import org.apache.spark.sql.DataFrame
@@ -304,4 +307,85 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("Test Partial Updates With Spark CDC") {
+    val databaseName = "hudi_database"
+    spark.sql(s"create database if not exists $databaseName")
+    spark.sql(s"use $databaseName")
+    withSQLConf(HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key 
-> "0",
+      DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> "true",
+      HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> "true") {
+      Seq(OP_KEY_ONLY, DATA_BEFORE, DATA_BEFORE_AFTER).foreach { loggingMode =>
+        withTempDir { tmp =>
+          val tableName = generateTableName
+          val basePath = s"${tmp.getCanonicalPath}/$tableName"
+          spark.sql(
+            s"""
+               | create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long,
+               |  pt string
+               | ) using hudi
+               | partitioned by (pt)
+               | tblproperties (
+               |   'primaryKey' = 'id',
+               |   'preCombineField' = 'ts',
+               |   'hoodie.table.cdc.enabled' = 'true',
+               |   'hoodie.table.cdc.supplemental.logging.mode' = 
'${loggingMode.name()}',
+               |   '${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key}' = 'true',
+               |   'type' = 'mor'
+               | )
+               | location '$basePath'
+      """.stripMargin)
+
+          val metaClient = createMetaClient(spark, basePath)
+
+          spark.sql(
+            s"""
+               | insert into $tableName values
+               | (1, 'a1', 11, 1000, '2021'),
+               | (2, 'a2', 12, 1000, '2022'),
+               | (3, 'a3', 13, 1000, '2022')
+      """.stripMargin)
+          val commitTime1 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
+          val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
+          cdcDataOnly1.show(false)
+          assertCDCOpCnt(cdcDataOnly1, 3, 0, 0)
+
+          spark.sql(s"insert overwrite table $tableName partition (pt = 
'2021') values (1, 'a1_v2', 11, 1100)")
+          val commitTime2 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
+          val cdcDataOnly2 = cdcDataFrame(basePath, commitTime2.toLong - 1)
+          cdcDataOnly2.show(false)
+          assertCDCOpCnt(cdcDataOnly2, 1, 0, 1)
+
+          spark.sql(s"update $tableName set name = 'a2_v2', ts = 1200 where id 
= 2")
+          val commitTime3 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
+          val cdcDataOnly3 = cdcDataFrame(basePath, commitTime3.toLong - 1)
+          cdcDataOnly3.show(false)
+          assertCDCOpCnt(cdcDataOnly3, 0, 1, 0)
+
+          spark.sql(
+            s"""
+               | merge into $tableName
+               | using (
+               |  select * from (
+               |  select 1 as id, 'a1_v3' as name, cast(1300 as long) as ts, 
"2021" as pt
+               |  )
+               | ) s0
+               | on s0.id = $tableName.id
+               | when matched then update set id = s0.id, name = s0.name, ts = 
s0.ts, pt = s0.pt
+      """.stripMargin)
+          val commitTime4 = 
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
+          val cdcDataOnly4 = cdcDataFrame(basePath, commitTime4.toLong - 1)
+          cdcDataOnly4.show(false)
+          assertCDCOpCnt(cdcDataOnly4, 0, 1, 0)
+
+          val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1)
+          assertCDCOpCnt(totalCdcData, 4, 2, 1)
+        }
+      }
+    }
+  }
 }

Reply via email to