This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 517ca54fe2fda67158ecbbc01d1e096441eccf1a
Author: komao <[email protected]>
AuthorDate: Thu Sep 22 11:17:54 2022 +0800

    [RFC-46][HUDI-4414] Update the RFC-46 doc to fix comments feedback (#6132)
    
    * Update the RFC-46 doc to fix comments feedback
    
    * fix
    
    Co-authored-by: wangzixuan.wzxuan <[email protected]>
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |   2 +-
 .../java/org/apache/hudi/io/HoodieCDCLogger.java   |   5 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   5 +-
 .../apache/hudi/io/HoodieMergeHandleFactory.java   |   5 +-
 .../hudi/io/HoodieMergeHandleWithChangeLog.java    |  23 ++-
 .../io/HoodieSortedMergeHandleWithChangeLog.java   |  15 +-
 .../hudi/table/action/commit/BaseWriteHelper.java  |   4 +-
 .../table/action/commit/HoodieWriteHelper.java     |   1 -
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   1 -
 .../FlinkSizeBasedClusteringPlanStrategy.java      |   3 +-
 .../FlinkMergeAndReplaceHandleWithChangeLog.java   |  19 +--
 .../hudi/io/FlinkMergeHandleWithChangeLog.java     |  19 +--
 .../apache/hudi/io/FlinkWriteHandleFactory.java    |  23 ++-
 .../hudi/table/action/commit/FlinkWriteHelper.java |   1 -
 .../hudi/table/action/commit/JavaWriteHelper.java  |   1 -
 .../MultipleSparkJobExecutionStrategy.java         |   2 +-
 .../bulkinsert/RDDSpatialCurveSortPartitioner.java |   2 -
 .../hudi/HoodieDatasetBulkInsertHelper.scala       |   2 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   8 +-
 .../table/log/HoodieCDCLogRecordIterator.java      |   6 +-
 .../table/log/block/HoodieAvroDataBlock.java       |   3 +
 .../common/table/log/block/HoodieCDCDataBlock.java |   7 +-
 .../org/apache/hudi/sink/StreamWriteFunction.java  |   2 +-
 .../table/format/mor/MergeOnReadInputFormat.java   |   4 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   3 +-
 .../scala/org/apache/hudi/LogFileIterator.scala    | 103 ++++++++-----
 .../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala   |  22 +--
 .../apache/hudi/functional/TestCOWDataSource.scala |   5 +-
 .../hudi/functional/cdc/HoodieCDCTestBase.scala    |  12 +-
 rfc/rfc-46/rfc-46.md                               | 169 ++++++++++++++++-----
 30 files changed, 310 insertions(+), 167 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 851d201f63..1984f325f0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -119,7 +119,7 @@ import static 
org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
  * Abstract Write Client providing functionality for performing commit, index 
updates and rollback
  * Reused for regular write operations like upsert/insert/bulk-insert.. as 
well as bootstrap
  *
- * @param <T> Sub type of HoodieRecordPayload
+ * @param <T> Type of data
  * @param <I> Type of inputs
  * @param <K> Type of keys
  * @param <O> Type of outputs
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index 303eea76db..50b87e3989 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -20,6 +20,7 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -145,10 +146,10 @@ public class HoodieCDCLogger implements Closeable {
     }
 
     try {
-      List<IndexedRecord> records = cdcData.values().stream()
+      List<HoodieRecord> records = cdcData.values().stream()
           .map(record -> {
             try {
-              return record.getInsertValue(cdcSchema).get();
+              return new 
HoodieAvroIndexedRecord(record.getInsertValue(cdcSchema).get());
             } catch (IOException e) {
               throw new HoodieIOException("Failed to get cdc record", e);
             }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index fef8fa8853..0933d9f28b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -267,7 +267,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
         + ((ExternalSpillableMap) 
keyToNewRecords).getSizeOfFileOnDiskInBytes());
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, 
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp) throws 
IOException {
+  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, 
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp) throws 
IOException {
     boolean isDelete = false;
     Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : 
tableSchema;
     if (combineRecordOp.isPresent()) {
@@ -292,7 +292,8 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
     writeInsertRecord(hoodieRecord, Option.of(hoodieRecord), schema, 
config.getProps());
   }
 
-  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, 
Option<HoodieRecord> insertRecord, Schema schema, Properties prop) {
+  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, 
Option<HoodieRecord> insertRecord, Schema schema, Properties prop)
+      throws IOException {
     if (writeRecord(hoodieRecord, insertRecord, schema, prop, 
HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
       insertRecordsWritten++;
     }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
index 436eff5dac..b110c2c081 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
@@ -21,7 +21,6 @@ package org.apache.hudi.io;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -38,7 +37,7 @@ public class HoodieMergeHandleFactory {
   /**
    * Creates a merge handle for normal write path.
    */
-  public static <T extends HoodieRecordPayload, I, K, O> HoodieMergeHandle<T, 
I, K, O> create(
+  public static <T, I, K, O> HoodieMergeHandle<T, I, K, O> create(
       WriteOperationType operationType,
       HoodieWriteConfig writeConfig,
       String instantTime,
@@ -70,7 +69,7 @@ public class HoodieMergeHandleFactory {
   /**
    * Creates a merge handle for compaction path.
    */
-  public static <T extends HoodieRecordPayload, I, K, O> HoodieMergeHandle<T, 
I, K, O> create(
+  public static <T, I, K, O> HoodieMergeHandle<T, I, K, O> create(
       HoodieWriteConfig writeConfig,
       String instantTime,
       HoodieTable<T, I, K, O> table,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index 910bc42158..6e8fda0b10 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -20,10 +20,10 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.table.log.AppendResult;
 import org.apache.hudi.common.util.Option;
@@ -31,9 +31,11 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -41,7 +43,7 @@ import java.util.Map;
 /**
  * A merge handle that supports logging change logs.
  */
-public class HoodieMergeHandleWithChangeLog<T extends HoodieRecordPayload, I, 
K, O> extends HoodieMergeHandle<T, I, K, O> {
+public class HoodieMergeHandleWithChangeLog<T, I, K, O> extends 
HoodieMergeHandle<T, I, K, O> {
   protected final HoodieCDCLogger cdcLogger;
 
   public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
@@ -73,19 +75,24 @@ public class HoodieMergeHandleWithChangeLog<T extends 
HoodieRecordPayload, I, K,
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, 
GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
-    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, 
indexedRecord);
+  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, 
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+      throws IOException {
+    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, 
combineRecordOp);
     if (result) {
       boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
-      cdcLogger.put(hoodieRecord, oldRecord, isDelete ? Option.empty() : 
indexedRecord);
+      cdcLogger.put(hoodieRecord, (GenericRecord) ((HoodieAvroIndexedRecord) 
oldRecord).getData(), isDelete ? Option.empty() : combineRecordOp.map(rec -> 
((HoodieAvroIndexedRecord) rec).getData()));
     }
     return result;
   }
 
-  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, 
Option<IndexedRecord> insertRecord) {
-    super.writeInsertRecord(hoodieRecord, insertRecord);
+  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws 
IOException {
+    // Get the data before deflated
+    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : 
tableSchema;
+    Option<IndexedRecord> recordOption = hoodieRecord.toIndexedRecord(schema, 
this.config.getProps())
+        .map(HoodieRecord::getData);
+    super.writeInsertRecord(hoodieRecord);
     if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
-      cdcLogger.put(hoodieRecord, null, insertRecord);
+      cdcLogger.put(hoodieRecord, null, recordOption);
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
index 8d317b709a..727765b3e2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
@@ -19,23 +19,25 @@
 package org.apache.hudi.io;
 
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.Schema;
 
+import java.io.IOException;
+import java.util.Properties;
 import java.util.Iterator;
 import java.util.Map;
 
 /**
  * A sorted merge handle that supports logging change logs.
  */
-public class HoodieSortedMergeHandleWithChangeLog<T extends 
HoodieRecordPayload, I, K, O> extends HoodieMergeHandleWithChangeLog<T, I, K, 
O> {
+public class HoodieSortedMergeHandleWithChangeLog<T, I, K, O> extends 
HoodieMergeHandleWithChangeLog<T, I, K, O> {
   public HoodieSortedMergeHandleWithChangeLog(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
                                               Iterator<HoodieRecord<T>> 
recordItr, String partitionPath, String fileId,
                                               TaskContextSupplier 
taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
@@ -51,9 +53,10 @@ public class HoodieSortedMergeHandleWithChangeLog<T extends 
HoodieRecordPayload,
     super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, 
fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
   }
 
-  protected boolean writeRecord(HoodieRecord<T> hoodieRecord, 
Option<IndexedRecord> insertRecord) {
-    final boolean result = super.writeRecord(hoodieRecord, insertRecord);
-    this.cdcLogger.put(hoodieRecord, null, insertRecord);
+  protected boolean writeRecord(HoodieRecord<T> hoodieRecord, 
Option<HoodieRecord> insertRecord, Schema schema, Properties props)
+      throws IOException {
+    final boolean result = super.writeRecord(hoodieRecord, insertRecord, 
schema, props);
+    this.cdcLogger.put(hoodieRecord, null, insertRecord.map(rec -> 
((HoodieAvroIndexedRecord) rec).getData()));
     return result;
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index 4ac5b3d714..0b96529379 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -84,9 +84,9 @@ public abstract class BaseWriteHelper<T, I, K, O, R> {
   public I deduplicateRecords(
       I records, HoodieTable<T, I, K, O> table, int parallelism) {
     HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger();
-    return deduplicateRecords(records, table.getIndex(), parallelism, 
table.getConfig().getSchema(), table.getConfig().getProps(), merge);
+    return deduplicateRecords(records, table.getIndex(), parallelism, 
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
   }
 
   public abstract I deduplicateRecords(
-      I records, HoodieIndex<?, ?> index, int parallelism, String schema, 
Properties props, HoodieMerge merge);
+      I records, HoodieIndex<?, ?> index, int parallelism, String schema, 
Properties props, HoodieRecordMerger merge);
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 0ed220dea9..bd495f69da 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 9085f392a9..1da892d7c4 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
-import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.model.WriteOperationType;
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
index 891ddf8993..3abffe38d8 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -48,7 +47,7 @@ import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_C
  * 1) Creates clustering groups based on max size allowed per group.
  * 2) Excludes files that are greater than 'small.file.limit' from clustering 
plan.
  */
-public class FlinkSizeBasedClusteringPlanStrategy<T extends 
HoodieRecordPayload<T>>
+public class FlinkSizeBasedClusteringPlanStrategy<T>
     extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> {
   private static final Logger LOG = 
LogManager.getLogger(FlinkSizeBasedClusteringPlanStrategy.class);
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
index 62b5481cf9..fa14ba0418 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
@@ -20,9 +20,9 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.table.log.AppendResult;
 import org.apache.hudi.common.util.Option;
@@ -30,9 +30,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
@@ -42,7 +42,7 @@ import java.util.List;
  * <p>The cdc about logic is copied from {@link 
HoodieMergeHandleWithChangeLog},
  * we should refactor it out when there are good abstractions.
  */
-public class FlinkMergeAndReplaceHandleWithChangeLog<T extends 
HoodieRecordPayload, I, K, O>
+public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K, O>
     extends FlinkMergeAndReplaceHandle<T, I, K, O> {
   private final HoodieCDCLogger cdcLogger;
 
@@ -59,19 +59,20 @@ public class FlinkMergeAndReplaceHandleWithChangeLog<T 
extends HoodieRecordPaylo
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, 
GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
-    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, 
indexedRecord);
+  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, 
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+      throws IOException {
+    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, 
combineRecordOp);
     if (result) {
       boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
-      cdcLogger.put(hoodieRecord, oldRecord, isDelete ? Option.empty() : 
indexedRecord);
+      cdcLogger.put(hoodieRecord, (GenericRecord) oldRecord.getData(), 
isDelete ? Option.empty() : combineRecordOp.map(rec -> 
((HoodieAvroIndexedRecord) rec).getData()));
     }
     return result;
   }
 
-  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, 
Option<IndexedRecord> insertRecord) {
-    super.writeInsertRecord(hoodieRecord, insertRecord);
+  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws 
IOException {
+    super.writeInsertRecord(hoodieRecord);
     if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
-      cdcLogger.put(hoodieRecord, null, insertRecord);
+      cdcLogger.put(hoodieRecord, null, Option.of((GenericRecord) 
hoodieRecord.getData()));
     }
   }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index f6adbbf0d4..b85c7c270f 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -20,9 +20,9 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.table.log.AppendResult;
 import org.apache.hudi.common.util.Option;
@@ -30,10 +30,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
@@ -43,7 +43,7 @@ import java.util.List;
  * <p>The cdc about logic is copied from {@link 
HoodieMergeHandleWithChangeLog},
  * we should refactor it out when there are good abstractions.
  */
-public class FlinkMergeHandleWithChangeLog<T extends HoodieRecordPayload, I, 
K, O>
+public class FlinkMergeHandleWithChangeLog<T, I, K, O>
     extends FlinkMergeHandle<T, I, K, O> {
   private final HoodieCDCLogger cdcLogger;
 
@@ -62,19 +62,20 @@ public class FlinkMergeHandleWithChangeLog<T extends 
HoodieRecordPayload, I, K,
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, 
GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
-    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, 
indexedRecord);
+  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, 
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+      throws IOException {
+    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, 
combineRecordOp);
     if (result) {
       boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
-      cdcLogger.put(hoodieRecord, oldRecord, isDelete ? Option.empty() : 
indexedRecord);
+      cdcLogger.put(hoodieRecord, (GenericRecord) oldRecord.getData(), 
isDelete ? Option.empty() : combineRecordOp.map(rec -> 
((HoodieAvroIndexedRecord) rec).getData()));
     }
     return result;
   }
 
-  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, 
Option<IndexedRecord> insertRecord) {
-    super.writeInsertRecord(hoodieRecord, insertRecord);
+  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws 
IOException {
+    super.writeInsertRecord(hoodieRecord);
     if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
-      cdcLogger.put(hoodieRecord, null, insertRecord);
+      cdcLogger.put(hoodieRecord, null, Option.of((GenericRecord) 
hoodieRecord.getData()));
     }
   }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
index fbc1c7ec55..9759d84cae 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
@@ -20,7 +20,6 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -39,7 +38,7 @@ public class FlinkWriteHandleFactory {
   /**
    * Returns the write handle factory with given write config.
    */
-  public static <T extends HoodieRecordPayload, I, K, O> Factory<T, I, K, O> 
getFactory(
+  public static <T, I, K, O> Factory<T, I, K, O> getFactory(
       HoodieTableConfig tableConfig,
       HoodieWriteConfig writeConfig) {
     if (writeConfig.allowDuplicateInserts()) {
@@ -58,7 +57,7 @@ public class FlinkWriteHandleFactory {
   //  Inner Class
   // -------------------------------------------------------------------------
 
-  public interface Factory<T extends HoodieRecordPayload, I, K, O> {
+  public interface Factory<T, I, K, O> {
     /**
      * Get or create a new write handle in order to reuse the file handles.
      *
@@ -87,7 +86,7 @@ public class FlinkWriteHandleFactory {
    * Base clazz for commit write handle factory,
    * it encapsulates the handle switching logic: INSERT OR UPSERT.
    */
-  private abstract static class BaseCommitWriteHandleFactory<T extends 
HoodieRecordPayload, I, K, O> implements Factory<T, I, K, O> {
+  private abstract static class BaseCommitWriteHandleFactory<T, I, K, O> 
implements Factory<T, I, K, O> {
     @Override
     public HoodieWriteHandle<?, ?, ?, ?> create(
         Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles,
@@ -140,12 +139,12 @@ public class FlinkWriteHandleFactory {
   /**
    * Write handle factory for commit.
    */
-  private static class CommitWriteHandleFactory<T extends HoodieRecordPayload, 
I, K, O>
+  private static class CommitWriteHandleFactory<T, I, K, O>
       extends BaseCommitWriteHandleFactory<T, I, K, O> {
     private static final CommitWriteHandleFactory<?, ?, ?, ?> INSTANCE = new 
CommitWriteHandleFactory<>();
 
     @SuppressWarnings("unchecked")
-    public static <T extends HoodieRecordPayload, I, K, O> 
CommitWriteHandleFactory<T, I, K, O> getInstance() {
+    public static <T, I, K, O> CommitWriteHandleFactory<T, I, K, O> 
getInstance() {
       return (CommitWriteHandleFactory<T, I, K, O>) INSTANCE;
     }
 
@@ -178,12 +177,12 @@ public class FlinkWriteHandleFactory {
   /**
    * Write handle factory for inline clustering.
    */
-  private static class ClusterWriteHandleFactory<T extends 
HoodieRecordPayload, I, K, O>
+  private static class ClusterWriteHandleFactory<T, I, K, O>
       extends BaseCommitWriteHandleFactory<T, I, K, O> {
     private static final ClusterWriteHandleFactory<?, ?, ?, ?> INSTANCE = new 
ClusterWriteHandleFactory<>();
 
     @SuppressWarnings("unchecked")
-    public static <T extends HoodieRecordPayload, I, K, O> 
ClusterWriteHandleFactory<T, I, K, O> getInstance() {
+    public static <T, I, K, O> ClusterWriteHandleFactory<T, I, K, O> 
getInstance() {
       return (ClusterWriteHandleFactory<T, I, K, O>) INSTANCE;
     }
 
@@ -216,12 +215,12 @@ public class FlinkWriteHandleFactory {
   /**
    * Write handle factory for commit, the write handle supports logging change 
logs.
    */
-  private static class CdcWriteHandleFactory<T extends HoodieRecordPayload, I, 
K, O>
+  private static class CdcWriteHandleFactory<T, I, K, O>
       extends BaseCommitWriteHandleFactory<T, I, K, O> {
     private static final CdcWriteHandleFactory<?, ?, ?, ?> INSTANCE = new 
CdcWriteHandleFactory<>();
 
     @SuppressWarnings("unchecked")
-    public static <T extends HoodieRecordPayload, I, K, O> 
CdcWriteHandleFactory<T, I, K, O> getInstance() {
+    public static <T, I, K, O> CdcWriteHandleFactory<T, I, K, O> getInstance() 
{
       return (CdcWriteHandleFactory<T, I, K, O>) INSTANCE;
     }
 
@@ -254,11 +253,11 @@ public class FlinkWriteHandleFactory {
   /**
    * Write handle factory for delta commit.
    */
-  private static class DeltaCommitWriteHandleFactory<T extends 
HoodieRecordPayload, I, K, O> implements Factory<T, I, K, O> {
+  private static class DeltaCommitWriteHandleFactory<T, I, K, O> implements 
Factory<T, I, K, O> {
     private static final DeltaCommitWriteHandleFactory<?, ?, ?, ?> INSTANCE = 
new DeltaCommitWriteHandleFactory<>();
 
     @SuppressWarnings("unchecked")
-    public static <T extends HoodieRecordPayload, I, K, O> 
DeltaCommitWriteHandleFactory<T, I, K, O> getInstance() {
+    public static <T, I, K, O> DeltaCommitWriteHandleFactory<T, I, K, O> 
getInstance() {
       return (DeltaCommitWriteHandleFactory<T, I, K, O>) INSTANCE;
     }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index ee2cf7a864..4993c43608 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index 255b0c612c..7edbb55c3e 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 4ad568a591..4c73012d2c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -197,7 +197,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
         case ZORDER:
         case HILBERT:
           return isRowPartitioner
-              ? new RowSpatialCurveSortPartitioner(getWriteConfig(), 
recordType)
+              ? new RowSpatialCurveSortPartitioner(getWriteConfig())
               : new RDDSpatialCurveSortPartitioner((HoodieSparkEngineContext) 
getEngineContext(), orderByColumns, layoutOptStrategy,
                   getWriteConfig().getLayoutOptimizationCurveBuildMethod(), 
HoodieAvroUtils.addMetadataFields(schema), recordType);
         case LINEAR:
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
index 51718a2e94..5ba5c5dfdf 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
@@ -43,8 +43,6 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
 
-import java.util.Arrays;
-import java.util.List;
 import java.util.Properties;
 
 /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 296abaf4f5..8d6787b2d5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -114,7 +114,7 @@ object HoodieDatasetBulkInsertHelper extends Logging {
    */
   def bulkInsert(dataset: Dataset[Row],
                  instantTime: String,
-                 table: HoodieTable[_ <: HoodieRecordPayload[_ <: 
HoodieRecordPayload[_ <: AnyRef]], _, _, _],
+                 table: HoodieTable[_, _, _, _],
                  writeConfig: HoodieWriteConfig,
                  partitioner: BulkInsertPartitioner[Dataset[Row]],
                  parallelism: Int,
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index b03e559603..c0a0307c3e 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -446,11 +446,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
    */
   private void testDeduplication(
       Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, 
JavaRDD<HoodieRecord>, String> writeFn, boolean populateMetaFields) throws 
Exception {
-    HoodieWriteConfig.Builder configBuilder = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
-        .combineInput(true, true);
-    addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
-    HoodieWriteConfig writeConfig = configBuilder.build();
-
     String newCommitTime = "001";
 
     String recordKey = UUID.randomUUID().toString();
@@ -478,7 +473,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     when(index.isGlobal()).thenReturn(true);
     HoodieRecordMerger recordMerger = 
HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
     int dedupParallelism = records.getNumPartitions() + 100;
-    HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd = 
HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 
dedupParallelism, writeConfig.getSchema(), , writeConfig.getProps(), merge);
+    HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd =
+        HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 
dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), 
recordMerger);
     List<HoodieRecord<RawTripTestPayload>> dedupedRecs = 
dedupedRecsRdd.collectAsList();
     assertEquals(records.getNumPartitions(), 
dedupedRecsRdd.getNumPartitions());
     assertEquals(1, dedupedRecs.size());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
index f194ddf8f4..f524141552 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
@@ -19,9 +19,12 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.MappingIterator;
 import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.avro.Schema;
@@ -59,7 +62,8 @@ public class HoodieCDCLogRecordIterator implements 
ClosableIterator<IndexedRecor
     if (itr == null || !itr.hasNext()) {
       if (reader.hasNext()) {
         HoodieDataBlock dataBlock = (HoodieDataBlock) reader.next();
-        itr = dataBlock.getRecordIterator();
+        // TODO support cdc with spark record.
+        itr = new 
MappingIterator(dataBlock.getRecordIterator(HoodieRecordType.AVRO), record -> 
((HoodieAvroIndexedRecord) record).getData());
         return itr.hasNext();
       }
       return false;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 9c6135dd28..17dc03adec 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -22,6 +22,9 @@ import org.apache.hudi.common.fs.SizeAwareDataInputStream;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
+import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockContentLocation;
+import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
 import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.MappingIterator;
 import org.apache.hudi.common.util.Option;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
index cc5663262c..a373a5ceb5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
@@ -19,10 +19,13 @@
 package org.apache.hudi.common.table.log.block;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 
+import org.apache.hudi.common.model.HoodieRecord;
+import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
+import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockContentLocation;
+import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
 import org.apache.hudi.common.util.Option;
 
 import java.util.HashMap;
@@ -46,7 +49,7 @@ public class HoodieCDCDataBlock extends HoodieAvroDataBlock {
         Option.of(readerSchema), header, new HashMap<>(), keyField, null);
   }
 
-  public HoodieCDCDataBlock(List<IndexedRecord> records,
+  public HoodieCDCDataBlock(List<HoodieRecord> records,
                             Map<HeaderMetadataType, String> header,
                             String keyField) {
     super(records, header, keyField);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 18437b5650..ba63b982f3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -433,7 +433,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
       Properties props = new Properties();
       config.addAllToProperties(props);
-      records = FlinkWriteHelper.newInstance().deduplicateRecords(records, 
(HoodieIndex) null, -1, props, this.writeClient.getConfig().getSchema(), 
recordMerger);
+      records = FlinkWriteHelper.newInstance().deduplicateRecords(records, 
(HoodieIndex) null, -1, this.writeClient.getConfig().getSchema(), props, 
recordMerger);
     }
     bucket.preWrite(records);
     final List<WriteStatus> writeStatus = new 
ArrayList<>(writeFunction.apply(records, instant));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index b8d4802d51..5364f6d96b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -792,8 +792,8 @@ public class MergeOnReadInputFormat
               continue;
             }
             IndexedRecord avroRecord = avroProjection.isPresent()
-                ? avroProjection.get().apply(mergedAvroRecord.get())
-                : mergedAvroRecord.get();
+                ? avroProjection.get().apply(mergedAvroRecord.get().getData())
+                : mergedAvroRecord.get().getData();
             this.currentRecord = (RowData) 
avroToRowDataConverter.convert(avroRecord);
             this.currentRecord.setRowKind(rowKind);
             return false;
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 87e04e8fd2..f18ef4fc13 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -295,7 +295,8 @@ object HoodieSparkSqlWriter {
               tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, 
internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
             )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
             val writeConfig = client.getConfig
-            if (writeConfig.getRecordMerger.getRecordType == 
HoodieRecordType.SPARK && 
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != 
HoodieLogBlockType.PARQUET_DATA_BLOCK) {
+            if (writeConfig.getRecordMerger.getRecordType == 
HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ &&
+              
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != 
HoodieLogBlockType.PARQUET_DATA_BLOCK) {
               throw new 
UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} 
only support parquet log.")
             }
             // Create a HoodieWriteClient & issue the write.
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
index 07a0ce7f23..87758ebd3c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
@@ -21,33 +21,35 @@ package org.apache.hudi
 import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, 
generateUnsafeProjection}
 import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
 import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
-import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord, 
HoodieRecordPayload}
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, 
HoodieEmptyRecord, HoodieLogFile, HoodieRecord, HoodieRecordPayload}
 import org.apache.hudi.config.HoodiePayloadConfig
+import org.apache.hudi.commmon.model.HoodieSparkRecord
 import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
 import org.apache.hudi.LogFileIterator._
 import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
-import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.engine.{EngineType, HoodieLocalEngineContext}
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
+import org.apache.hudi.common.util.{HoodieRecordUtils, SerializationUtils}
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadata}
 import 
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
-
-import org.apache.avro.Schema
+import org.apache.avro.{Schema, SchemaNormalization}
 import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, 
IndexedRecord}
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
-
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.hudi.SparkStructTypeSerializer
+import org.apache.spark.sql.HoodieCatalystExpressionUtils
 import org.apache.spark.sql.types.StructType
-
 import java.io.Closeable
+import java.nio.charset.StandardCharsets
 import java.util.Properties
-
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.util.Try
@@ -77,10 +79,13 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
   protected override val structTypeSchema: StructType = 
requiredSchema.structTypeSchema
 
   protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
+  protected val logFileReaderStructType: StructType = 
tableSchema.structTypeSchema
 
   protected var recordToLoad: InternalRow = _
 
-  private val requiredSchemaSafeAvroProjection = 
SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema)
+  protected val requiredSchemaSafeAvroProjection: SafeAvroProjection = 
SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema)
+
+  protected val requiredSchemaSafeRowProjection: UnsafeProjection = 
HoodieCatalystExpressionUtils.generateUnsafeProjection(logFileReaderStructType, 
structTypeSchema)
 
   // TODO: now logScanner with internalSchema support column project, we may 
no need projectAvroUnsafe
   private var logScanner = {
@@ -91,21 +96,20 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
 
   private val logRecords = logScanner.getRecords.asScala
 
-  def logRecordsIterator(): Iterator[(String, HoodieRecord[_ <: 
HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]]])] = {
-    logRecords.iterator.asInstanceOf[Iterator[(String, HoodieRecord[_ <: 
HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]]])]]
+  def logRecordsPairIterator(): Iterator[(String, HoodieRecord[_])] = {
+    logRecords.iterator
   }
 
   // NOTE: This have to stay lazy to make sure it's initialized only at the 
point where it's
   //       going to be used, since we modify `logRecords` before that and 
therefore can't do it any earlier
-  protected lazy val genericRecordsIterator: Iterator[Option[GenericRecord]] =
-  logRecords.iterator.map {
-    case (_, record) =>
-      toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, 
payloadProps))
-        .map(_.asInstanceOf[GenericRecord])
+  protected lazy val logRecordsIterator: Iterator[Option[HoodieRecord[_]]] = 
logRecords.iterator.map {
+      case (_, record: HoodieSparkRecord) => Option(record)
+      case (_, _: HoodieEmptyRecord[_]) => Option.empty
+      case (_, record) =>
+        toScalaOption(record.toIndexedRecord(logFileReaderAvroSchema, 
payloadProps))
   }
 
-  protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: 
HoodieRecordPayload[_]]] =
-    logRecords.remove(key)
+  protected def removeLogRecord(key: String): Option[HoodieRecord[_]] = 
logRecords.remove(key)
 
   override def hasNext: Boolean = hasNextInternal
 
@@ -113,15 +117,16 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
   //       that recursion is unfolded into a loop to avoid stack overflows 
while
   //       handling records
   @tailrec private def hasNextInternal: Boolean = {
-    genericRecordsIterator.hasNext && {
-      val avroRecordOpt = genericRecordsIterator.next()
-      if (avroRecordOpt.isEmpty) {
-        // Record has been deleted, skipping
-        this.hasNextInternal
-      } else {
-        val projectedAvroRecord = 
requiredSchemaSafeAvroProjection(avroRecordOpt.get)
-        recordToLoad = deserialize(projectedAvroRecord)
-        true
+    logRecordsIterator.hasNext && {
+      logRecordsIterator.next() match {
+        case Some(r: HoodieAvroIndexedRecord) =>
+          val projectedAvroRecord = 
requiredSchemaSafeAvroProjection(r.getData.asInstanceOf[GenericRecord])
+          recordToLoad = deserialize(projectedAvroRecord)
+          true
+        case Some(r: HoodieSparkRecord) =>
+          recordToLoad = requiredSchemaSafeRowProjection(r.getData)
+          true
+        case None => this.hasNextInternal
       }
     }
   }
@@ -196,6 +201,10 @@ class RecordMergingFileIterator(split: 
HoodieMergeOnReadFileSplit,
 
   private val baseFileIterator = baseFileReader(split.dataFile.get)
 
+  private val mergerList = tableState.mergerImpls.split(",")
+    .map(_.trim).distinct.toList.asJava
+  private val recordMerger = 
HoodieRecordUtils.generateRecordMerger(tableState.tablePath, EngineType.SPARK, 
mergerList, tableState.mergerStrategy)
+
   override def hasNext: Boolean = hasNextInternal
 
   // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
@@ -211,14 +220,12 @@ class RecordMergingFileIterator(split: 
HoodieMergeOnReadFileSplit,
         recordToLoad = requiredSchemaUnsafeProjection(curRow)
         true
       } else {
-        val mergedAvroRecordOpt = merge(serialize(curRow), 
updatedRecordOpt.get)
-        if (mergedAvroRecordOpt.isEmpty) {
+       val mergedRecordOpt = merge(curRow, updatedRecordOpt.get)
+        if (mergedRecordOpt.isEmpty) {
           // Record has been deleted, skipping
           this.hasNextInternal
         } else {
-          val projectedAvroRecord = 
projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord],
-            avroSchema, reusableRecordBuilder)
-          recordToLoad = deserialize(projectedAvroRecord)
+          recordToLoad = mergedRecordOpt.get
           true
         }
       }
@@ -230,10 +237,22 @@ class RecordMergingFileIterator(split: 
HoodieMergeOnReadFileSplit,
   private def serialize(curRowRecord: InternalRow): GenericRecord =
     serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
 
-  private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: 
HoodieRecordPayload[_]]): Option[IndexedRecord] = {
+  private def merge(curRow: InternalRow, newRecord: HoodieRecord[_]): 
Option[InternalRow] = {
     // NOTE: We have to pass in Avro Schema used to read from Delta Log file 
since we invoke combining API
     //       on the record from the Delta Log
-    toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, 
logFileReaderAvroSchema, payloadProps))
+    recordMerger.getRecordType match {
+      case HoodieRecordType.SPARK =>
+        val curRecord = new HoodieSparkRecord(curRow, baseFileReader.schema)
+        toScalaOption(recordMerger.merge(curRecord, newRecord, 
logFileReaderAvroSchema, payloadProps))
+          .map(r => {
+            val projection = 
HoodieInternalRowUtils.getCachedUnsafeProjection(r.asInstanceOf[HoodieSparkRecord].getStructType,
 structTypeSchema)
+            projection.apply(r.getData.asInstanceOf[InternalRow])
+          })
+      case _ =>
+        val curRecord = new HoodieAvroIndexedRecord(serialize(curRow))
+        toScalaOption(recordMerger.merge(curRecord, newRecord, 
logFileReaderAvroSchema, payloadProps))
+        .map(r => 
deserialize(projectAvroUnsafe(r.toIndexedRecord(logFileReaderAvroSchema, new 
Properties()).get().getData.asInstanceOf[GenericRecord], avroSchema, 
reusableRecordBuilder)))
+    }
   }
 }
 
@@ -302,6 +321,15 @@ object LogFileIterator {
           getRelativePartitionPath(new Path(tableState.tablePath), 
logFiles.head.getPath.getParent))
       }
 
+      val mergerList = tableState.mergerImpls.split(",")
+        .map(_.trim).distinct.toList.asJava
+      val recordMerger = 
HoodieRecordUtils.generateRecordMerger(tableState.tablePath, EngineType.SPARK, 
mergerList, tableState.mergerStrategy)
+      logRecordScannerBuilder.withRecordMerger(recordMerger)
+
+      if (recordMerger.getRecordType == HoodieRecordType.SPARK) {
+        
registerStructTypeSerializerIfNeed(List(HoodieInternalRowUtils.getCachedSchema(logSchema)))
+      }
+
       logRecordScannerBuilder.build()
     }
   }
@@ -320,4 +348,11 @@ object LogFileIterator {
       .getOrElse(split.logFiles.head.getPath)
       .getParent
   }
+
+  private def registerStructTypeSerializerIfNeed(schemas: List[StructType]): 
Unit = {
+    val schemaMap = schemas.map(schema => 
(SchemaNormalization.fingerprint64(schema.json.getBytes(StandardCharsets.UTF_8)),
 schema))
+      .toMap
+    val serializer = new SparkStructTypeSerializer(schemaMap)
+    SerializationUtils.setOverallRegister(classOf[HoodieSparkRecord].getName, 
serializer)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 598059c030..ad60f6187b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -24,12 +24,13 @@ import org.apache.hudi.HoodieConversionUtils._
 import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
-import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, 
HoodieRecordPayload}
+import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger, 
HoodieLogFile, HoodieRecord, HoodieRecordPayload}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._
 import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
 import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, 
HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
 import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator
+import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig}
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
@@ -151,7 +152,7 @@ class HoodieCDCRDD(
     private lazy val tableState = {
       val metadataConfig = HoodieMetadataConfig.newBuilder()
         .fromProperties(props)
-        .build();
+        .build()
       HoodieTableState(
         pathToString(basePath),
         split.changes.last.getInstant,
@@ -159,7 +160,10 @@ class HoodieCDCRDD(
         preCombineFieldOpt,
         usesVirtualKeys = false,
         metaClient.getTableConfig.getPayloadClass,
-        metadataConfig
+        metadataConfig,
+        // TODO support CDC with spark record
+        mergerImpls = classOf[HoodieAvroRecordMerger].getName,
+        mergerStrategy = StringUtils.DEFAULT_MERGER_STRATEGY_UUID
       )
     }
 
@@ -217,7 +221,7 @@ class HoodieCDCRDD(
      * Only one case where it will be used is that extract the change data 
from log files for mor table.
      * At the time, 'logRecordIter' will work with [[beforeImageRecords]] that 
keep all the records of the previous file slice.
      */
-    private var logRecordIter: Iterator[(String, HoodieRecord[_ <: 
HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]]])] = Iterator.empty
+    private var logRecordIter: Iterator[(String, HoodieRecord[_])] = 
Iterator.empty
 
     /**
      * Only one case where it will be used is that extract the change data 
from cdc log files.
@@ -434,7 +438,7 @@ class HoodieCDCRDD(
             val absLogPath = new Path(basePath, currentChangeFile.getCdcFile)
             val morSplit = HoodieMergeOnReadFileSplit(None, List(new 
HoodieLogFile(fs.getFileStatus(absLogPath))))
             val logFileIterator = new LogFileIterator(morSplit, 
originTableSchema, originTableSchema, tableState, conf)
-            logRecordIter = logFileIterator.logRecordsIterator()
+            logRecordIter = logFileIterator.logRecordsPairIterator
           case AS_IS =>
             assert(currentChangeFile.getCdcFile != null)
             // load beforeFileSlice to beforeImageRecords
@@ -600,9 +604,9 @@ class HoodieCDCRDD(
     }
 
     private def getInsertValue(
-        record: HoodieRecord[_ <: HoodieRecordPayload[_ <: 
HoodieRecordPayload[_ <: AnyRef]]])
+        record: HoodieRecord[_])
     : Option[IndexedRecord] = {
-      toScalaOption(record.getData.getInsertValue(avroSchema, payloadProps))
+      toScalaOption(record.toIndexedRecord(avroSchema, 
payloadProps)).map(_.getData)
     }
 
     private def projectAvroUnsafe(record: IndexedRecord): GenericRecord = {
@@ -610,8 +614,8 @@ class HoodieCDCRDD(
         avroSchema, reusableRecordBuilder)
     }
 
-    private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ 
<: HoodieRecordPayload[_]]): IndexedRecord = {
-      newRecord.getData.combineAndGetUpdateValue(curAvroRecord, avroSchema, 
payloadProps).get()
+    private def merge(curAvroRecord: GenericRecord, newRecord: 
HoodieRecord[_]): IndexedRecord = {
+      
newRecord.getData.asInstanceOf[HoodieRecordPayload[_]].combineAndGetUpdateValue(curAvroRecord,
 avroSchema, payloadProps).get()
     }
 
     private def generateUnsafeProjection(from: StructType, to: StructType): 
UnsafeProjection =
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index bdf2da7848..37d320e192 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -72,10 +72,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
     HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
     HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
   )
-  val sparkOpts = Map(
-    HoodieWriteConfig.MERGER_IMPLS.key -> 
classOf[HoodieSparkRecordMerger].getName,
-    HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet"
-  )
+  val sparkOpts = Map(HoodieWriteConfig.MERGER_IMPLS.key -> 
classOf[HoodieSparkRecordMerger].getName)
 
   val verificationCol: String = "driver"
   val updatedVerificationVal: String = "driver_update"
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
index 5f447a9bce..c15123d55a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
@@ -20,7 +20,7 @@ package org.apache.hudi.functional.cdc
 import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieKey, 
HoodieLogFile, HoodieRecord}
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, 
HoodieCommitMetadata, HoodieKey, HoodieLogFile, HoodieRecord}
 import org.apache.hudi.common.table.cdc.HoodieCDCOperation
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.table.log.HoodieLogFormat
@@ -29,17 +29,13 @@ import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.testutils.RawTripTestPayload
 import org.apache.hudi.config.{HoodieCleanConfig, HoodieWriteConfig}
 import org.apache.hudi.testutils.HoodieClientTestBase
-
 import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericRecord, IndexedRecord}
-
 import org.apache.hadoop.fs.Path
-
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.spark.sql.{DataFrame, SparkSession}
-
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, 
assertNull, assertTrue}
-
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
@@ -122,8 +118,8 @@ abstract class HoodieCDCTestBase extends 
HoodieClientTestBase {
     val reader = HoodieLogFormat.newReader(fs, logFile, cdcSchema);
     assertTrue(reader.hasNext);
 
-    val block = reader.next().asInstanceOf[HoodieDataBlock];
-    block.getRecordIterator.asScala.toList
+    val block = reader.next().asInstanceOf[HoodieDataBlock]
+    
block.getRecordIterator[IndexedRecord](HoodieRecordType.AVRO).asScala.toList.map(_.getData)
   }
 
   protected def checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode: 
String,
diff --git a/rfc/rfc-46/rfc-46.md b/rfc/rfc-46/rfc-46.md
index a851a4443a..192bdbf8c6 100644
--- a/rfc/rfc-46/rfc-46.md
+++ b/rfc/rfc-46/rfc-46.md
@@ -38,7 +38,7 @@ when dealing with records (during merge, column value 
extractions, writing into
 
 While having a single format of the record representation is certainly making 
implementation of some components simpler, 
 it bears unavoidable performance penalty of de-/serialization loop: every 
record handled by Hudi has to be converted
-from (low-level) engine-specific representation (`Row` for Spark, `RowData` 
for Flink, `ArrayWritable` for Hive) into intermediate 
+from (low-level) engine-specific representation (`InternalRow` for Spark, 
`RowData` for Flink, `ArrayWritable` for Hive) into intermediate 
 one (Avro), with some operations (like clustering, compaction) potentially 
incurring this penalty multiple times (on read- 
 and write-paths). 
 
@@ -84,59 +84,105 @@ is known to have poor performance (compared to 
non-reflection based instantiatio
 
 #### Record Merge API
 
-Stateless component interface providing for API Combining Records will look 
like following:
+CombineAndGetUpdateValue and Precombine will converge to one API. Stateless 
component interface providing for API Combining Records will look like 
following:
 
 ```java
-interface HoodieMerge {
-   HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);
-
-   Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, 
HoodieRecord newer, Schema schema, Properties props) throws IOException;
-}
+interface HoodieRecordMerger {
 
    /**
-    * Spark-specific implementation 
+    * The kind of merging strategy this recordMerger belongs to. A UUID 
represents merging strategy.
     */
-   class HoodieSparkRecordMerge implements HoodieMerge {
+   String getMergingStrategy();
+  
+   // This method converges combineAndGetUpdateValue and precombine from 
HoodiePayload. 
+   // It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we 
can translate as having 3 versions A, B, C of the single record, both orders of 
operations applications have to yield the same result)
+   Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema 
schema, Properties props) throws IOException;
+   
+   // The record type handled by the current merger
+   // SPARK, AVRO, FLINK
+   HoodieRecordType getRecordType();
+}
 
-      @Override
-      public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
-        // HoodieSparkRecords preCombine
-      }
+/**
+ * Spark-specific implementation 
+ */
+class HoodieSparkRecordMerger implements HoodieRecordMerger {
+
+  @Override
+  public String getMergingStrategy() {
+    return UUID_MERGER_STRATEGY;
+  }
+  
+   @Override
+   Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema 
schema, Properties props) throws IOException {
+     // HoodieSparkRecord precombine and combineAndGetUpdateValue. It'd be 
associative operation.
+   }
 
-      @Override
-      public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, 
HoodieRecord newer, Schema schema, Properties props) {
-         // HoodieSparkRecord combineAndGetUpdateValue
-      }
+   @Override
+   HoodieRecordType getRecordType() {
+     return HoodieRecordType.SPARK;
    }
+}
    
-   /**
-    * Flink-specific implementation 
-    */
-   class HoodieFlinkRecordMerge implements HoodieMerge {
-
-      @Override
-      public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
-        // HoodieFlinkRecord preCombine
-      }
+/**
+ * Flink-specific implementation 
+ */
+class HoodieFlinkRecordMerger implements HoodieRecordMerger {
+
+   @Override
+   public String getMergingStrategy() {
+      return UUID_MERGER_STRATEGY;
+   }
+  
+   @Override
+   Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema 
schema, Properties props) throws IOException {
+      // HoodieFlinkRecord precombine and combineAndGetUpdateValue. It'd be 
associative operation.
+   }
 
-      @Override
-      public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, 
HoodieRecord newer, Schema schema, Properties props) {
-         // HoodieFlinkRecord combineAndGetUpdateValue
-      }
+   @Override
+   HoodieRecordType getRecordType() {
+      return HoodieRecordType.FLINK;
    }
+}
 ```
 Where user can provide their own subclass implementing such interface for the 
engines of interest.
 
-#### Migration from `HoodieRecordPayload` to `HoodieMerge`
+#### Migration from `HoodieRecordPayload` to `HoodieRecordMerger`
 
 To warrant backward-compatibility (BWC) on the code-level with already created 
subclasses of `HoodieRecordPayload` currently
-already used in production by Hudi users, we will provide a BWC-bridge in the 
form of instance of `HoodieMerge`, that will 
+already used in production by Hudi users, we will provide a BWC-bridge in the 
form of instance of `HoodieRecordMerger` called `HoodieAvroRecordMerger`, that 
will 
 be using user-defined subclass of `HoodieRecordPayload` to combine the records.
 
-Leveraging such bridge will make provide for seamless BWC migration to the 
0.11 release, however will be removing the performance 
+Leveraging such bridge will provide for seamless BWC migration to the 0.11 
release, however will be removing the performance 
 benefit of this refactoring, since it would unavoidably have to perform 
conversion to intermediate representation (Avro). To realize
 full-suite of benefits of this refactoring, users will have to migrate their 
merging logic out of `HoodieRecordPayload` subclass and into
-new `HoodieMerge` implementation.
+new `HoodieRecordMerger` implementation.
+
+Precombine is used to merge records from logs or incoming records; 
CombineAndGetUpdateValue is used to merge record from log file and record from 
base file.
+these two merge logics are unified in HoodieAvroRecordMerger as merge 
function. `HoodieAvroRecordMerger`'s API will look like following:
+
+```java
+/**
+ * Backward compatibility HoodieRecordPayload implementation 
+ */
+class HoodieAvroRecordMerger implements HoodieRecordMerger {
+
+   @Override
+   public String getMergingStrategy() {
+      return UUID_MERGER_STRATEGY;
+   }
+  
+   @Override
+   Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema 
schema, Properties props) throws IOException {
+      // HoodieAvroRecordMerger precombine and combineAndGetUpdateValue. It'd 
be associative operation.
+   }
+
+   @Override
+   HoodieRecordType getRecordType() {
+      return HoodieRecordType.AVRO;
+   }
+}
+```
 
 ### Refactoring Flows Directly Interacting w/ Records:
 
@@ -156,13 +202,66 @@ Following major components will be refactored:
 3. `HoodieRealtimeRecordReader`s 
    1. API will be returning opaque `HoodieRecord` instead of raw Avro payload
 
+### Config for RecordMerger
+The RecordMerger is engine-aware. We provide a config called MERGER_IMPLS. You 
can set a list of RecordMerger class name to it. And you can set 
MERGER_STRATEGY which is UUID of RecordMerger. Hudi will pick RecordMergers in 
MERGER_IMPLS which has the same MERGER_STRATEGY according to the engine type at 
runtime.
+
+### Public Api in HoodieRecord
+Because we implement different types of records, we need to implement 
functionality similar to AvroUtils in HoodieRecord for different data(avro, 
InternalRow, RowData).
+Its public API will look like following:
+
+```java
+import java.util.Properties;
+
+class HoodieRecord {
+
+   /**
+    * Get column in record to support RDDCustomColumnsSortPartitioner
+    */
+   Object getRecordColumnValues(Schema recordSchema, String[] columns,
+           boolean consistentLogicalTimestampEnabled);
+
+   /**
+    * Support bootstrap.
+    */
+   HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws 
IOException;
+
+   /**
+    * Rewrite record into new schema(add meta columns)
+    */
+   HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema 
targetSchema)
+           throws IOException;
+
+   /**
+    * Support schema evolution.
+    */
+   HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties 
props, Schema newSchema,
+           Map<String, String> renameCols) throws IOException;
+
+   HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties 
props, Schema newSchema) throws IOException;
+
+   HoodieRecord updateMetadataValues(Schema recordSchema, Properties props,
+           MetadataValues metadataValues) throws IOException;
+
+   boolean isDelete(Schema recordSchema, Properties props) throws IOException;
+
+   /**
+    * Is EmptyRecord. Generated by ExpressionPayload.
+    */
+   boolean shouldIgnore(Schema recordSchema, Properties props) throws 
IOException;
+
+   Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties 
props)
+           throws IOException;
+
+   // Other functions with getter or setter ...
+}
+```
 
 ## Rollout/Adoption Plan
 
  - What impact (if any) will there be on existing users? 
    - Users of the Hudi will observe considerably better performance for most 
of the routine operations: writing, reading, compaction, clustering, etc due to 
avoiding the superfluous intermediate de-/serialization penalty
    - By default, modified hierarchy would still leverage 
-   - Users will need to rebase their logic of combining records by creating a 
subclass of `HoodieRecordPayload`, and instead subclass newly created interface 
`HoodieMerge` to get full-suite of performance benefits 
+   - Users will need to rebase their logic of combining records by creating a 
subclass of `HoodieRecordPayload`, and instead subclass newly created interface 
`HoodieRecordMerger` to get full-suite of performance benefits 
  - If we are changing behavior how will we phase out the older behavior?
    - Older behavior leveraging `HoodieRecordPayload` for merging will be 
marked as deprecated in 0.11, and subsequently removed in 0.1x
  - If we need special migration tools, describe them here.

Reply via email to