This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ee7764b9749d929917a4e9641f4a32f269674100
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Fri Jan 27 18:56:32 2023 -0800

    [HUDI-5023] Switching default Write Executor type to `SIMPLE` (#7476)
    
    This change switches default Write Executor to be SIMPLE ie one bypassing 
reliance on any kind of Queue (either BoundedInMemory or Disruptor's one).
    
    This should considerably trim down on
    
    Runtime (compared to BIMQ)
    Compute wasted (compared to BIMQ, Disruptor)
    Since it eliminates unnecessary intermediary "staging" of the records in 
the queue (for ex, in Spark such in-memory enqueueing occurs at the ingress 
points, ie shuffling), and allows to handle records writing in one pass (even 
avoiding making copies of the records in the future)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 48 +++++++++++---------
 .../hudi/execution/HoodieLazyInsertIterable.java   | 30 +++++++------
 .../java/org/apache/hudi/util/ExecutorFactory.java | 24 +++++-----
 .../hudi/execution/FlinkLazyInsertIterable.java    |  2 +-
 .../hudi/execution/JavaLazyInsertIterable.java     |  2 +-
 .../hudi/execution/SparkLazyInsertIterable.java    |  2 +-
 .../TestBoundedInMemoryExecutorInSpark.java        | 28 ++++++------
 .../hudi/execution/TestBoundedInMemoryQueue.java   | 23 ++++++----
 .../execution/TestDisruptorExecutionInSpark.java   | 48 ++++++++++----------
 .../hudi/execution/TestDisruptorMessageQueue.java  | 30 +++++++------
 .../hudi/execution/TestSimpleExecutionInSpark.java | 51 +++++++++-------------
 .../common/util/queue/BoundedInMemoryExecutor.java |  2 +-
 .../hudi/common/util/queue/DisruptorExecutor.java  | 19 +++++---
 .../common/util/queue/DisruptorMessageQueue.java   | 18 +++++---
 .../hudi/common/util/queue/ExecutorType.java       | 15 +------
 ...mpleHoodieExecutor.java => SimpleExecutor.java} | 51 ++++++++++------------
 .../common/util/queue/WaitStrategyFactory.java     |  8 ++--
 17 files changed, 200 insertions(+), 201 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 2f36aa725e3..b9d7c800250 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -54,6 +54,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.queue.DisruptorWaitStrategyType;
 import org.apache.hudi.common.util.queue.ExecutorType;
 import org.apache.hudi.config.metrics.HoodieMetricsCloudWatchConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
@@ -97,8 +99,7 @@ import java.util.Properties;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import static org.apache.hudi.common.util.queue.ExecutorType.BOUNDED_IN_MEMORY;
-import static org.apache.hudi.common.util.queue.ExecutorType.DISRUPTOR;
+import static org.apache.hudi.common.util.queue.ExecutorType.SIMPLE;
 import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;
 import static 
org.apache.hudi.table.marker.ConflictDetectionUtils.getDefaultEarlyConflictDetectionStrategy;
 
@@ -158,10 +159,10 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Key generator class, that implements 
`org.apache.hudi.keygen.KeyGenerator` "
           + "extract a key out of incoming records.");
 
-  public static final ConfigProperty<String> EXECUTOR_TYPE = ConfigProperty
+  public static final ConfigProperty<String> WRITE_EXECUTOR_TYPE = 
ConfigProperty
       .key("hoodie.write.executor.type")
-      .defaultValue(BOUNDED_IN_MEMORY.name())
-      .withValidValues(BOUNDED_IN_MEMORY.name(), DISRUPTOR.name())
+      .defaultValue(SIMPLE.name())
+      
.withValidValues(Arrays.stream(ExecutorType.values()).map(Enum::name).toArray(String[]::new))
       .sinceVersion("0.13.0")
       .withDocumentation("Set executor which orchestrates concurrent producers 
and consumers communicating through a message queue."
           + "BOUNDED_IN_MEMORY(default): Use LinkedBlockingQueue as a bounded 
in-memory queue, this queue will use extra lock to balance producers and 
consumer"
@@ -271,15 +272,15 @@ public class HoodieWriteConfig extends HoodieConfig {
       .defaultValue(String.valueOf(4 * 1024 * 1024))
       .withDocumentation("Size of in-memory buffer used for parallelizing 
network reads and lake storage writes.");
 
-  public static final ConfigProperty<String> WRITE_DISRUPTOR_BUFFER_SIZE = 
ConfigProperty
+  public static final ConfigProperty<String> 
WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE = ConfigProperty
       .key("hoodie.write.executor.disruptor.buffer.size")
       .defaultValue(String.valueOf(1024))
       .sinceVersion("0.13.0")
       .withDocumentation("The size of the Disruptor Executor ring buffer, must 
be power of 2");
 
-  public static final ConfigProperty<String> WRITE_WAIT_STRATEGY = 
ConfigProperty
+  public static final ConfigProperty<String> 
WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY = ConfigProperty
       .key("hoodie.write.executor.disruptor.wait.strategy")
-      .defaultValue("BLOCKING_WAIT")
+      .defaultValue(DisruptorWaitStrategyType.BLOCKING_WAIT.name())
       .sinceVersion("0.13.0")
       .withDocumentation("Strategy employed for making Disruptor Executor wait 
on a cursor. Other options are "
           + "SLEEPING_WAIT, it attempts to be conservative with CPU usage by 
using a simple busy wait loop"
@@ -1107,7 +1108,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public ExecutorType getExecutorType() {
-    return 
ExecutorType.valueOf(getStringOrDefault(EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
+    return 
ExecutorType.valueOf(getStringOrDefault(WRITE_EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
   }
 
   public boolean isCDCEnabled() {
@@ -1175,12 +1176,12 @@ public class HoodieWriteConfig extends HoodieConfig {
     return 
Integer.parseInt(getStringOrDefault(WRITE_BUFFER_LIMIT_BYTES_VALUE));
   }
 
-  public Option<String> getWriteExecutorWaitStrategy() {
-    return Option.of(getString(WRITE_WAIT_STRATEGY));
+  public String getWriteExecutorDisruptorWaitStrategy() {
+    return getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY);
   }
 
-  public Option<Integer> getDisruptorWriteBufferSize() {
-    return 
Option.of(Integer.parseInt(getStringOrDefault(WRITE_DISRUPTOR_BUFFER_SIZE)));
+  public Integer getWriteExecutorDisruptorWriteBufferSize() {
+    return 
Integer.parseInt(getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE));
   }
 
   public boolean shouldCombineBeforeInsert() {
@@ -1987,7 +1988,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   public String getDatadogApiKey() {
     if (props.containsKey(HoodieMetricsDatadogConfig.API_KEY.key())) {
       return getString(HoodieMetricsDatadogConfig.API_KEY);
-      
+
     } else {
       Supplier<String> apiKeySupplier = ReflectionUtils.loadClass(
           getString(HoodieMetricsDatadogConfig.API_KEY_SUPPLIER));
@@ -2481,7 +2482,7 @@ public class HoodieWriteConfig extends HoodieConfig {
     }
 
     public Builder withExecutorType(String executorClass) {
-      writeConfig.setValue(EXECUTOR_TYPE, executorClass);
+      writeConfig.setValue(WRITE_EXECUTOR_TYPE, executorClass);
       return this;
     }
 
@@ -2536,13 +2537,13 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder withWriteWaitStrategy(String waitStrategy) {
-      writeConfig.setValue(WRITE_WAIT_STRATEGY, String.valueOf(waitStrategy));
+    public Builder withWriteExecutorDisruptorWaitStrategy(String waitStrategy) 
{
+      writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY, 
String.valueOf(waitStrategy));
       return this;
     }
 
-    public Builder withWriteBufferSize(int size) {
-      writeConfig.setValue(WRITE_DISRUPTOR_BUFFER_SIZE, String.valueOf(size));
+    public Builder withWriteExecutorDisruptorWriteBufferSize(long size) {
+      writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE, 
String.valueOf(size));
       return this;
     }
 
@@ -2970,8 +2971,15 @@ public class HoodieWriteConfig extends HoodieConfig {
     }
 
     public HoodieWriteConfig build() {
+      return build(true);
+    }
+
+    @VisibleForTesting
+    public HoodieWriteConfig build(boolean shouldValidate) {
       setDefaults();
-      validate();
+      if (shouldValidate) {
+        validate();
+      }
       // Build WriteConfig at the end
       return new HoodieWriteConfig(engineType, writeConfig.getProps());
     }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
index 20f75f63c52..991e52982cf 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
@@ -21,9 +21,9 @@ package org.apache.hudi.execution;
 import org.apache.avro.Schema;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.LazyIterableIterator;
-import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.queue.ExecutorType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.io.WriteHandleFactory;
@@ -90,23 +90,25 @@ public abstract class HoodieLazyInsertIterable<T>
     }
   }
 
-  static <T> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformer(Schema schema,
-                                                                               
                        HoodieWriteConfig config) {
-    return getCloningTransformerInternal(schema, config.getProps());
+  static <T> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getTransformer(Schema schema,
+                                                                               
                 HoodieWriteConfig writeConfig) {
+    return getTransformerInternal(schema, writeConfig);
   }
 
-  static <T> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformer(Schema schema) {
-    return getCloningTransformerInternal(schema, new TypedProperties());
-  }
+  private static <T> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getTransformerInternal(Schema schema,
+                                                                               
                                 HoodieWriteConfig writeConfig) {
+    // NOTE: Whether record have to be cloned here is determined based on the 
executor type used
+    //       for writing: executors relying on an inner queue, will be keeping 
references to the records
+    //       and therefore in the environments where underlying buffer holding 
the record could be
+    //       reused (for ex, Spark) we need to make sure that we get a clean 
copy of
+    //       it since these records will be subsequently buffered (w/in the 
in-memory queue);
+    //       Only case when we don't need to make a copy is when using 
[[SimpleExecutor]] which
+    //       is guaranteed to not hold on to references to any records
+    boolean shouldClone = writeConfig.getExecutorType() != ExecutorType.SIMPLE;
 
-  private static <T> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformerInternal(Schema 
schema,
-                                                                               
                                        TypedProperties props) {
     return record -> {
-      // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
-      //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
-      //       it since these records will be subsequently buffered (w/in the 
in-memory queue)
-      HoodieRecord<T> clonedRecord = record.copy();
-      return new HoodieInsertValueGenResult(clonedRecord, schema, props);
+      HoodieRecord<T> clonedRecord = shouldClone ? record.copy() : record;
+      return new HoodieInsertValueGenResult(clonedRecord, schema, 
writeConfig.getProps());
     };
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java
index b9e7f06f848..bd192f649db 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java
@@ -20,11 +20,11 @@ package org.apache.hudi.util;
 
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
-import org.apache.hudi.common.util.queue.SimpleHoodieExecutor;
 import org.apache.hudi.common.util.queue.DisruptorExecutor;
 import org.apache.hudi.common.util.queue.ExecutorType;
-import org.apache.hudi.common.util.queue.HoodieExecutor;
 import org.apache.hudi.common.util.queue.HoodieConsumer;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
+import org.apache.hudi.common.util.queue.SimpleExecutor;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 
@@ -34,17 +34,17 @@ import java.util.function.Function;
 public class ExecutorFactory {
 
   public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig 
hoodieConfig,
-                                                         Iterator<I> inputItr,
-                                                         HoodieConsumer<O, E> 
consumer,
-                                                         Function<I, O> 
transformFunction) {
+                                                   Iterator<I> inputItr,
+                                                   HoodieConsumer<O, E> 
consumer,
+                                                   Function<I, O> 
transformFunction) {
     return create(hoodieConfig, inputItr, consumer, transformFunction, 
Functions.noop());
   }
 
   public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig 
hoodieConfig,
-                                                         Iterator<I> inputItr,
-                                                         HoodieConsumer<O, E> 
consumer,
-                                                         Function<I, O> 
transformFunction,
-                                                         Runnable 
preExecuteRunnable) {
+                                                   Iterator<I> inputItr,
+                                                   HoodieConsumer<O, E> 
consumer,
+                                                   Function<I, O> 
transformFunction,
+                                                   Runnable 
preExecuteRunnable) {
     ExecutorType executorType = hoodieConfig.getExecutorType();
 
     switch (executorType) {
@@ -52,10 +52,10 @@ public class ExecutorFactory {
         return new 
BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, 
consumer,
             transformFunction, preExecuteRunnable);
       case DISRUPTOR:
-        return new 
DisruptorExecutor<>(hoodieConfig.getDisruptorWriteBufferSize(), inputItr, 
consumer,
-            transformFunction, hoodieConfig.getWriteExecutorWaitStrategy(), 
preExecuteRunnable);
+        return new 
DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferSize(), 
inputItr, consumer,
+            transformFunction, 
hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
       case SIMPLE:
-        return new SimpleHoodieExecutor<>(inputItr, consumer, 
transformFunction);
+        return new SimpleExecutor<>(inputItr, consumer, transformFunction);
       default:
         throw new HoodieException("Unsupported Executor Type " + executorType);
     }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
index 69797166249..6e573ec9432 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
@@ -60,7 +60,7 @@ public class FlinkLazyInsertIterable<T> extends 
HoodieLazyInsertIterable<T> {
     try {
       final Schema schema = new 
Schema.Parser().parse(hoodieConfig.getSchema());
       bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, 
inputItr, getExplicitInsertHandler(),
-          getCloningTransformer(schema, hoodieConfig));
+          getTransformer(schema, hoodieConfig));
       final List<WriteStatus> result = bufferedIteratorExecutor.execute();
       checkState(result != null && !result.isEmpty());
       return result;
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
index 5113b340680..d2e813a506b 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
@@ -64,7 +64,7 @@ public class JavaLazyInsertIterable<T> extends 
HoodieLazyInsertIterable<T> {
     try {
       final Schema schema = new 
Schema.Parser().parse(hoodieConfig.getSchema());
       bufferedIteratorExecutor =
-          ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), 
getCloningTransformer(schema));
+          ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), 
getTransformer(schema, hoodieConfig));
       final List<WriteStatus> result = bufferedIteratorExecutor.execute();
       checkState(result != null && !result.isEmpty());
       return result;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
index 7cb2b27e171..147b5cf6b33 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
@@ -87,7 +87,7 @@ public class SparkLazyInsertIterable<T> extends 
HoodieLazyInsertIterable<T> {
       }
 
       bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, 
inputItr, getInsertHandler(),
-          getCloningTransformer(schema, hoodieConfig), 
hoodieTable.getPreExecuteRunnable());
+          getTransformer(schema, hoodieConfig), 
hoodieTable.getPreExecuteRunnable());
 
       final List<WriteStatus> result = bufferedIteratorExecutor.execute();
       checkState(result != null && !result.isEmpty());
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
index 3ccdf1ec106..eb61cb43312 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
@@ -23,6 +23,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.ExecutorType;
 import org.apache.hudi.common.util.queue.HoodieConsumer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -41,18 +42,21 @@ import java.util.List;
 
 import scala.Tuple2;
 
-import static 
org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer;
+import static 
org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformer;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class TestBoundedInMemoryExecutorInSpark extends 
HoodieClientTestHarness {
 
   private final String instantTime = 
HoodieActiveTimeline.createNewInstantTime();
 
+  private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+      .withExecutorType(ExecutorType.BOUNDED_IN_MEMORY.name())
+      .withWriteBufferLimitBytes(1024)
+      .build(false);
+
   @BeforeEach
   public void setUp() throws Exception {
     initTestDataGenerator();
@@ -74,8 +78,6 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieClientTestHarness
     final int recordNumber = 100;
     final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, recordNumber);
 
-    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
-    when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
     
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
         new 
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
 
@@ -94,8 +96,8 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieClientTestHarness
 
     BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> executor = null;
     try {
-      executor = new 
BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), 
hoodieRecords.iterator(), consumer,
-          getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), 
getPreExecuteRunnable());
+      executor = new 
BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), 
hoodieRecords.iterator(), consumer,
+          getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), 
getPreExecuteRunnable());
       int result = executor.execute();
 
       assertEquals(100, result);
@@ -113,8 +115,6 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieClientTestHarness
   public void testInterruptExecutor() {
     final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 100);
 
-    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
-    when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
     
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
         new 
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
 
@@ -136,8 +136,8 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieClientTestHarness
         };
 
     BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> executor =
-        new 
BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), 
hoodieRecords.iterator(), consumer,
-            getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), 
getPreExecuteRunnable());
+        new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), 
hoodieRecords.iterator(), consumer,
+            getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), 
getPreExecuteRunnable());
 
     // Interrupt the current thread (therefore triggering executor to throw as 
soon as it
     // invokes [[get]] on the [[CompletableFuture]])
@@ -154,8 +154,6 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieClientTestHarness
 
   @Test
   public void testExecutorTermination() {
-    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
-    when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
     Iterator<GenericRecord> unboundedRecordIter = new 
Iterator<GenericRecord>() {
       @Override
       public boolean hasNext() {
@@ -181,8 +179,8 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieClientTestHarness
         };
 
     BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> executor =
-        new 
BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), 
unboundedRecordIter,
-            consumer, 
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA),
+        new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), 
unboundedRecordIter,
+            consumer, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, 
writeConfig),
             getPreExecuteRunnable());
     executor.shutdownNow();
     boolean terminatedGracefully = executor.awaitTermination();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
index c9be18c9da3..50ba44c5688 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
@@ -27,9 +27,11 @@ import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SizeEstimator;
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
+import org.apache.hudi.common.util.queue.ExecutorType;
 import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
 import org.apache.hudi.common.util.queue.HoodieProducer;
 import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 
@@ -54,7 +56,7 @@ import java.util.stream.IntStream;
 
 import scala.Tuple2;
 
-import static 
org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer;
+import static 
org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformer;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -65,6 +67,11 @@ public class TestBoundedInMemoryQueue extends 
HoodieClientTestHarness {
 
   private final String instantTime = 
HoodieActiveTimeline.createNewInstantTime();
 
+  private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+      .withExecutorType(ExecutorType.BOUNDED_IN_MEMORY.name())
+      .withWriteBufferLimitBytes(1024)
+      .build(false);
+
   @BeforeEach
   public void setUp() throws Exception {
     initTestDataGenerator();
@@ -85,7 +92,7 @@ public class TestBoundedInMemoryQueue extends 
HoodieClientTestHarness {
     final int numRecords = 128;
     final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, numRecords);
     final BoundedInMemoryQueue<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
-        new BoundedInMemoryQueue(FileIOUtils.KB, 
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
+        new BoundedInMemoryQueue(FileIOUtils.KB, 
getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig));
     // Produce
     Future<Boolean> resFuture = executorService.submit(() -> {
       new 
IteratorBasedQueueProducer<>(hoodieRecords.iterator()).produce(queue);
@@ -125,7 +132,7 @@ public class TestBoundedInMemoryQueue extends 
HoodieClientTestHarness {
     final List<List<HoodieRecord>> recs = new ArrayList<>();
 
     final BoundedInMemoryQueue<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
-        new BoundedInMemoryQueue(FileIOUtils.KB, 
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
+        new BoundedInMemoryQueue(FileIOUtils.KB, 
getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig));
 
     // Record Key to <Producer Index, Rec Index within a producer>
     Map<String, Tuple2<Integer, Integer>> keyToProducerAndIndexMap = new 
HashMap<>();
@@ -220,11 +227,11 @@ public class TestBoundedInMemoryQueue extends 
HoodieClientTestHarness {
     final int recordLimit = 5;
     final SizeEstimator<HoodieLazyInsertIterable.HoodieInsertValueGenResult> 
sizeEstimator = new DefaultSizeEstimator<>();
     HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult =
-        
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA).apply((HoodieAvroRecord)
 hoodieRecords.get(0));
+        getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, 
writeConfig).apply(hoodieRecords.get(0));
     final long objSize = sizeEstimator.sizeEstimate(genResult);
     final long memoryLimitInBytes = recordLimit * objSize;
     final BoundedInMemoryQueue<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
-        new BoundedInMemoryQueue(memoryLimitInBytes, 
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
+        new BoundedInMemoryQueue(memoryLimitInBytes, 
getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig));
 
     // Produce
     executorService.submit(() -> {
@@ -269,7 +276,7 @@ public class TestBoundedInMemoryQueue extends 
HoodieClientTestHarness {
     final SizeEstimator<Tuple2<HoodieRecord, Option<IndexedRecord>>> 
sizeEstimator = new DefaultSizeEstimator<>();
     // queue memory limit
     HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult =
-        
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA).apply(hoodieRecords.get(0));
+        getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, 
writeConfig).apply(hoodieRecords.get(0));
     final long objSize = sizeEstimator.sizeEstimate(new 
Tuple2(genResult.getResult(), 
genResult.getResult().toIndexedRecord(HoodieTestDataGenerator.AVRO_SCHEMA, new 
Properties())));
     final long memoryLimitInBytes = 4 * objSize;
 
@@ -277,7 +284,7 @@ public class TestBoundedInMemoryQueue extends 
HoodieClientTestHarness {
     // stops and throws
     // correct exception back.
     BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>> queue1 =
-        new BoundedInMemoryQueue(memoryLimitInBytes, 
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
+        new BoundedInMemoryQueue(memoryLimitInBytes, 
getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig));
 
     // Produce
     Future<Boolean> resFuture = executorService.submit(() -> {
@@ -305,7 +312,7 @@ public class TestBoundedInMemoryQueue extends 
HoodieClientTestHarness {
     when(mockHoodieRecordsIterator.hasNext()).thenReturn(true);
     when(mockHoodieRecordsIterator.next()).thenThrow(expectedException);
     BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>> queue2 =
-        new BoundedInMemoryQueue(memoryLimitInBytes, 
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
+        new BoundedInMemoryQueue(memoryLimitInBytes, 
getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig));
 
     // Produce
     Future<Boolean> res = executorService.submit(() -> {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
index 674aca74153..55c2325b137 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
@@ -18,13 +18,11 @@
 
 package org.apache.hudi.execution;
 
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.queue.HoodieConsumer;
 import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.ExecutorType;
+import org.apache.hudi.common.util.queue.HoodieConsumer;
 import org.apache.hudi.common.util.queue.WaitStrategyFactory;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -38,21 +36,23 @@ import org.junit.jupiter.api.Timeout;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Function;
 
-import scala.Tuple2;
-
-import static 
org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness {
 
   private final String instantTime = 
HoodieActiveTimeline.createNewInstantTime();
 
+
+  private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+      .withExecutorType(ExecutorType.DISRUPTOR.name())
+      .withWriteExecutorDisruptorWriteBufferSize(8)
+      .build(false);
+
   @BeforeEach
   public void setUp() throws Exception {
     initTestDataGenerator();
@@ -75,16 +75,14 @@ public class TestDisruptorExecutionInSpark extends 
HoodieClientTestHarness {
     final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 128);
     final List<HoodieRecord> consumedRecords = new ArrayList<>();
 
-    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
-    
when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(8));
-    
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
-        new 
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
+    HoodieConsumer<HoodieRecord, Integer> consumer =
+        new HoodieConsumer<HoodieRecord, Integer>() {
 
           private int count = 0;
 
           @Override
-          public void 
consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> 
record) {
-            consumedRecords.add(record.getResult());
+          public void consume(HoodieRecord record) {
+            consumedRecords.add(record);
             count++;
           }
 
@@ -93,11 +91,11 @@ public class TestDisruptorExecutionInSpark extends 
HoodieClientTestHarness {
             return count;
           }
         };
-    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> exec = null;
+    DisruptorExecutor<HoodieRecord, HoodieRecord, Integer> exec = null;
 
     try {
-      exec = new 
DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), 
hoodieRecords.iterator(), consumer,
-          getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), 
Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable());
+      exec = new 
DisruptorExecutor<>(writeConfig.getWriteExecutorDisruptorWriteBufferSize(), 
hoodieRecords.iterator(), consumer,
+          Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, 
getPreExecuteRunnable());
       int result = exec.execute();
       // It should buffer and write 100 records
       assertEquals(128, result);
@@ -123,13 +121,11 @@ public class TestDisruptorExecutionInSpark extends 
HoodieClientTestHarness {
   public void testInterruptExecutor() {
     final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 100);
 
-    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
-    
when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(1024));
-    
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
-        new 
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
+    HoodieConsumer<HoodieRecord, Integer> consumer =
+        new HoodieConsumer<HoodieRecord, Integer>() {
 
           @Override
-          public void 
consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> 
record) {
+          public void consume(HoodieRecord record) {
             try {
               Thread.currentThread().wait();
             } catch (InterruptedException ie) {
@@ -143,9 +139,9 @@ public class TestDisruptorExecutionInSpark extends 
HoodieClientTestHarness {
           }
         };
 
-    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer>
-        executor = new 
DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), 
hoodieRecords.iterator(), consumer,
-        getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), 
Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable());
+    DisruptorExecutor<HoodieRecord, HoodieRecord, Integer>
+        executor = new DisruptorExecutor<>(1024, hoodieRecords.iterator(), 
consumer,
+        Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, 
getPreExecuteRunnable());
 
     try {
       Thread.currentThread().interrupt();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
index 2ff9d02926b..7344ccd89fd 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.queue.DisruptorExecutor;
 import org.apache.hudi.common.util.queue.DisruptorMessageQueue;
+import org.apache.hudi.common.util.queue.ExecutorType;
 import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
 import org.apache.hudi.common.util.queue.HoodieConsumer;
 import org.apache.hudi.common.util.queue.HoodieProducer;
@@ -56,17 +57,20 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.hudi.exception.ExceptionUtil.getRootCause;
-import static 
org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer;
+import static 
org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformer;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class TestDisruptorMessageQueue extends HoodieClientTestHarness {
 
   private final String instantTime = 
HoodieActiveTimeline.createNewInstantTime();
 
+  private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+      .withExecutorType(ExecutorType.DISRUPTOR.name())
+      .withWriteExecutorDisruptorWriteBufferSize(16)
+      .build(false);
+
   @BeforeEach
   public void setUp() throws Exception {
     initTestDataGenerator();
@@ -108,8 +112,6 @@ public class TestDisruptorMessageQueue extends 
HoodieClientTestHarness {
       }
     });
 
-    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
-    
when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(16));
     
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
         new 
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
 
@@ -137,8 +139,8 @@ public class TestDisruptorMessageQueue extends 
HoodieClientTestHarness {
     DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> exec = null;
 
     try {
-      exec = new 
DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), 
hoodieRecords.iterator(), consumer,
-          getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), 
Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable());
+      exec = new 
DisruptorExecutor(writeConfig.getWriteExecutorDisruptorWriteBufferSize(), 
hoodieRecords.iterator(), consumer,
+          getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), 
WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
       int result = exec.execute();
       // It should buffer and write 100 records
       assertEquals(100, result);
@@ -167,8 +169,8 @@ public class TestDisruptorMessageQueue extends 
HoodieClientTestHarness {
     final List<List<HoodieRecord>> recs = new ArrayList<>();
 
     final DisruptorMessageQueue<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
-        new DisruptorMessageQueue(Option.of(1024), 
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA),
-            Option.of("BLOCKING_WAIT"), numProducers, new Runnable() {
+        new DisruptorMessageQueue(1024, 
getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig),
+            "BLOCKING_WAIT", numProducers, new Runnable() {
               @Override
           public void run() {
                 // do nothing.
@@ -282,8 +284,8 @@ public class TestDisruptorMessageQueue extends 
HoodieClientTestHarness {
     final int numProducers = 40;
 
     final DisruptorMessageQueue<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
-        new DisruptorMessageQueue(Option.of(1024), 
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA),
-            Option.of("BLOCKING_WAIT"), numProducers, new Runnable() {
+        new DisruptorMessageQueue(1024, 
getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig),
+            "BLOCKING_WAIT", numProducers, new Runnable() {
               @Override
           public void run() {
               // do nothing.
@@ -324,9 +326,9 @@ public class TestDisruptorMessageQueue extends 
HoodieClientTestHarness {
       }
     };
 
-    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> exec = new DisruptorExecutor(Option.of(1024),
-        producers, consumer, 
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA),
-        Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), 
getPreExecuteRunnable());
+    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> exec = new DisruptorExecutor(1024,
+        producers, consumer, 
getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig),
+        WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
 
     final Throwable thrown = assertThrows(HoodieException.class, exec::execute,
         "exception is expected");
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java
index bbc85efd376..830577463b2 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.execution;
 
-import static 
org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer;
-
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -27,8 +25,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.queue.HoodieConsumer;
-import org.apache.hudi.common.util.queue.SimpleHoodieExecutor;
-import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.common.util.queue.SimpleExecutor;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 import org.junit.jupiter.api.AfterEach;
@@ -41,16 +38,13 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
-
-import scala.Tuple2;
+import java.util.function.Function;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
-public class TestSimpleExecutionInSpark extends HoodieClientTestHarness {
+public class  TestSimpleExecutionInSpark extends HoodieClientTestHarness {
 
   private final String instantTime = 
HoodieActiveTimeline.createNewInstantTime();
 
@@ -71,15 +65,13 @@ public class TestSimpleExecutionInSpark extends 
HoodieClientTestHarness {
     final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 128);
     final List<HoodieRecord> consumedRecords = new ArrayList<>();
 
-    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
-    
when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(8));
-    
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
-        new 
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
+    HoodieConsumer<HoodieRecord, Integer> consumer =
+        new HoodieConsumer<HoodieRecord, Integer>() {
           private int count = 0;
 
           @Override
-          public void 
consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> 
record) throws Exception {
-            consumedRecords.add(record.getResult());
+          public void consume(HoodieRecord record) throws Exception {
+            consumedRecords.add(record);
             count++;
           }
 
@@ -88,10 +80,10 @@ public class TestSimpleExecutionInSpark extends 
HoodieClientTestHarness {
             return count;
           }
         };
-    SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> exec = null;
+    SimpleExecutor<HoodieRecord, HoodieRecord, Integer> exec = null;
 
     try {
-      exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, 
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
+      exec = new SimpleExecutor<>(hoodieRecords.iterator(), consumer, 
Function.identity());
 
       int result = exec.execute();
       // It should buffer and write 128 records
@@ -133,16 +125,16 @@ public class TestSimpleExecutionInSpark extends 
HoodieClientTestHarness {
       }
     });
 
-    
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
-        new 
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
+    HoodieConsumer<HoodieRecord, Integer> consumer =
+        new HoodieConsumer<HoodieRecord, Integer>() {
           private int count = 0;
 
           @Override
-          public void 
consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> 
record) throws Exception {
+          public void consume(HoodieRecord record) throws Exception {
             count++;
-            afterRecord.add((HoodieAvroRecord) record.getResult());
+            afterRecord.add((HoodieAvroRecord) record);
             try {
-              IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) 
record.getResult())
+              IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) 
record)
                   
.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
               afterIndexedRecord.add(indexedRecord);
             } catch (IOException e) {
@@ -156,10 +148,10 @@ public class TestSimpleExecutionInSpark extends 
HoodieClientTestHarness {
           }
         };
 
-    SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> exec = null;
+    SimpleExecutor<HoodieRecord, HoodieRecord, Integer> exec = null;
 
     try {
-      exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, 
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
+      exec = new SimpleExecutor<>(hoodieRecords.iterator(), consumer, 
Function.identity());
       int result = exec.execute();
       assertEquals(100, result);
 
@@ -186,14 +178,13 @@ public class TestSimpleExecutionInSpark extends 
HoodieClientTestHarness {
     List<HoodieRecord> pRecs = dataGen.generateInserts(instantTime, 
numRecords);
     InnerIterator iterator = new InnerIterator(pRecs.iterator(), errorMessage, 
numRecords / 10);
 
-    
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
-        new 
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
+    HoodieConsumer<HoodieRecord, Integer> consumer =
+        new HoodieConsumer<HoodieRecord, Integer>() {
           int count = 0;
 
           @Override
-          public void 
consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> 
payload) throws Exception {
+          public void consume(HoodieRecord payload) throws Exception {
             // Read recs and ensure we have covered all producer recs.
-            final HoodieRecord rec = payload.getResult();
             count++;
           }
 
@@ -203,8 +194,8 @@ public class TestSimpleExecutionInSpark extends 
HoodieClientTestHarness {
           }
         };
 
-    SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> exec =
-        new SimpleHoodieExecutor(iterator, consumer, 
getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
+    SimpleExecutor<HoodieRecord, HoodieRecord, Integer> exec =
+        new SimpleExecutor<>(iterator, consumer, Function.identity());
 
     final Throwable thrown = assertThrows(HoodieException.class, exec::execute,
         "exception is expected");
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
index c379a7abcc0..20e5af1a91c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
@@ -61,7 +61,7 @@ public class BoundedInMemoryExecutor<I, O, E> extends 
BaseHoodieQueueBasedExecut
       }
       LOG.info("All records from the queue have been consumed");
     } catch (Exception e) {
-      LOG.error("Error consuming records", e);
+      LOG.error("Failed consuming records", e);
       queue.markAsFailed(e);
       throw new HoodieException(e);
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java
index 4ebbd6e528b..056c45a0bf4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java
@@ -32,16 +32,23 @@ import java.util.function.Function;
  */
 public class DisruptorExecutor<I, O, E> extends 
BaseHoodieQueueBasedExecutor<I, O, E> {
 
-  public DisruptorExecutor(final Option<Integer> bufferSize, final Iterator<I> 
inputItr,
-                           HoodieConsumer<O, E> consumer, Function<I, O> 
transformFunction, Option<String> waitStrategy, Runnable preExecuteRunnable) {
+  public DisruptorExecutor(Integer bufferSize,
+                           Iterator<I> inputItr,
+                           HoodieConsumer<O, E> consumer,
+                           Function<I, O> transformFunction,
+                           String waitStrategy,
+                           Runnable preExecuteRunnable) {
     this(bufferSize, Collections.singletonList(new 
IteratorBasedQueueProducer<>(inputItr)), consumer,
         transformFunction, waitStrategy, preExecuteRunnable);
   }
 
-  public DisruptorExecutor(final Option<Integer> bufferSize, 
List<HoodieProducer<I>> producers,
-                           HoodieConsumer<O, E> consumer, final Function<I, O> 
transformFunction,
-                           final Option<String> waitStrategy, Runnable 
preExecuteRunnable) {
-    super(producers, Option.of(consumer), new 
DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategy, 
producers.size(), preExecuteRunnable), preExecuteRunnable);
+  public DisruptorExecutor(int bufferSize,
+                           List<HoodieProducer<I>> producers,
+                           HoodieConsumer<O, E> consumer,
+                           Function<I, O> transformFunction,
+                           String waitStrategyId,
+                           Runnable preExecuteRunnable) {
+    super(producers, Option.of(consumer), new 
DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategyId, 
producers.size(), preExecuteRunnable), preExecuteRunnable);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java
index 19b0a04b5e8..de4ecff9ae4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java
@@ -27,6 +27,8 @@ import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.function.Function;
 
@@ -38,6 +40,8 @@ import java.util.function.Function;
  */
 public class DisruptorMessageQueue<I, O> implements HoodieMessageQueue<I, O> {
 
+  private static final Logger LOG = 
LogManager.getLogger(DisruptorMessageQueue.class);
+
   private final Disruptor<HoodieDisruptorEvent> queue;
   private final Function<I, O> transformFunction;
   private final RingBuffer<HoodieDisruptorEvent> ringBuffer;
@@ -45,11 +49,11 @@ public class DisruptorMessageQueue<I, O> implements 
HoodieMessageQueue<I, O> {
   private boolean isShutdown = false;
   private boolean isStarted = false;
 
-  public DisruptorMessageQueue(Option<Integer> bufferSize, Function<I, O> 
transformFunction, Option<String> waitStrategyName, int totalProducers, 
Runnable preExecuteRunnable) {
-    WaitStrategy waitStrategy = WaitStrategyFactory.build(waitStrategyName);
+  public DisruptorMessageQueue(int bufferSize, Function<I, O> 
transformFunction, String waitStrategyId, int totalProducers, Runnable 
preExecuteRunnable) {
+    WaitStrategy waitStrategy = WaitStrategyFactory.build(waitStrategyId);
     CustomizedThreadFactory threadFactory = new 
CustomizedThreadFactory("disruptor", true, preExecuteRunnable);
 
-    this.queue = new Disruptor<>(HoodieDisruptorEvent::new, bufferSize.get(), 
threadFactory, totalProducers > 1 ? ProducerType.MULTI : ProducerType.SINGLE, 
waitStrategy);
+    this.queue = new Disruptor<>(HoodieDisruptorEvent::new, bufferSize, 
threadFactory, totalProducers > 1 ? ProducerType.MULTI : ProducerType.SINGLE, 
waitStrategy);
     this.ringBuffer = queue.getRingBuffer();
     this.transformFunction = transformFunction;
   }
@@ -103,9 +107,13 @@ public class DisruptorMessageQueue<I, O> implements 
HoodieMessageQueue<I, O> {
     }
   }
 
-  protected void setHandlers(HoodieConsumer consumer) {
+  protected void setHandlers(HoodieConsumer<O, ?> consumer) {
     queue.handleEventsWith((event, sequence, endOfBatch) -> {
-      consumer.consume(event.get());
+      try {
+        consumer.consume(event.get());
+      } catch (Exception e) {
+        LOG.error("Failed consuming records", e);
+      }
     });
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java
index 43b2f061030..77004173423 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java
@@ -18,12 +18,6 @@
 
 package org.apache.hudi.common.util.queue;
 
-import org.apache.hudi.keygen.constant.KeyGeneratorType;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
 /**
  * Types of {@link org.apache.hudi.common.util.queue.HoodieExecutor}.
  */
@@ -46,12 +40,5 @@ public enum ExecutorType {
    * The disadvantage is that the executor is a single-write-single-read 
model, cannot support functions such as speed limit
    * and can not de-coupe the network read (shuffle read) and network write 
(writing objects/files to storage) anymore.
    */
-  SIMPLE;
-
-  public static List<String> getNames() {
-    List<String> names = new ArrayList<>(ExecutorType.values().length);
-    Arrays.stream(KeyGeneratorType.values())
-        .forEach(x -> names.add(x.name()));
-    return names;
-  }
+  SIMPLE
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java
similarity index 53%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java
index 8d9fcc892be..e175c22d930 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java
@@ -18,9 +18,6 @@
 
 package org.apache.hudi.common.util.queue;
 
-import static org.apache.hudi.common.util.ValidationUtils.checkState;
-
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -29,31 +26,31 @@ import java.util.Iterator;
 import java.util.function.Function;
 
 /**
- * Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no 
inner message queue and no inner lock.
- * Consuming and writing records from iterator directly.
+ * Simple implementation of the {@link HoodieExecutor} interface assuming 
single-writer/single-reader
+ * mode allowing it to consume from the input {@link Iterator} directly 
avoiding the need for
+ * any internal materialization (ie queueing).
  *
- * Compared with queue based Executor
- * Advantages: there is no need for additional memory and cpu resources due to 
lock or multithreading.
- * Disadvantages: lost some benefits such as speed limit. And maybe lower 
throughput.
+ * <p>
+ * Such executor is aimed primarily at allowing
+ * the production-consumption chain to run w/ as little overhead as possible, 
at the expense of
+ * limited parallelism and therefore throughput, which is not an issue for 
execution environments
+ * such as Spark, where it's used primarily in a parallelism constraint 
environment (on executors)
  */
-public class SimpleHoodieExecutor<I, O, E> implements HoodieExecutor<E> {
+public class SimpleExecutor<I, O, E> implements HoodieExecutor<E> {
 
-  private static final Logger LOG = 
LogManager.getLogger(SimpleHoodieExecutor.class);
+  private static final Logger LOG = LogManager.getLogger(SimpleExecutor.class);
 
+  // Record iterator (producer)
+  private final Iterator<I> itr;
   // Consumer
-  protected final Option<HoodieConsumer<O, E>> consumer;
-  // records iterator
-  protected final Iterator<I> it;
-  private final Function<I, O> transformFunction;
+  private final HoodieConsumer<O, E> consumer;
 
-  public SimpleHoodieExecutor(final Iterator<I> inputItr, HoodieConsumer<O, E> 
consumer,
-                              Function<I, O> transformFunction) {
-    this(inputItr, Option.of(consumer), transformFunction);
-  }
+  private final Function<I, O> transformFunction;
 
-  public SimpleHoodieExecutor(final Iterator<I> inputItr, 
Option<HoodieConsumer<O, E>> consumer,
-                              Function<I, O> transformFunction) {
-    this.it = inputItr;
+  public SimpleExecutor(Iterator<I> inputItr,
+                        HoodieConsumer<O, E> consumer,
+                        Function<I, O> transformFunction) {
+    this.itr = inputItr;
     this.consumer = consumer;
     this.transformFunction = transformFunction;
   }
@@ -63,18 +60,16 @@ public class SimpleHoodieExecutor<I, O, E> implements 
HoodieExecutor<E> {
    */
   @Override
   public E execute() {
-    checkState(this.consumer.isPresent());
-
     try {
       LOG.info("Starting consumer, consuming records from the records iterator 
directly");
-      while (it.hasNext()) {
-        O payload = transformFunction.apply(it.next());
-        consumer.get().consume(payload);
+      while (itr.hasNext()) {
+        O payload = transformFunction.apply(itr.next());
+        consumer.consume(payload);
       }
 
-      return consumer.get().finish();
+      return consumer.finish();
     } catch (Exception e) {
-      LOG.error("Error consuming records in SimpleHoodieExecutor", e);
+      LOG.error("Failed consuming records", e);
       throw new HoodieException(e);
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java
index dc3c4474810..5c32e7f835e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java
@@ -18,14 +18,12 @@
 
 package org.apache.hudi.common.util.queue;
 
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieException;
-
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.BusySpinWaitStrategy;
 import com.lmax.disruptor.SleepingWaitStrategy;
 import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.YieldingWaitStrategy;
+import org.apache.hudi.exception.HoodieException;
 
 import static 
org.apache.hudi.common.util.queue.DisruptorWaitStrategyType.BLOCKING_WAIT;
 
@@ -39,8 +37,8 @@ public class WaitStrategyFactory {
   /**
    * Build WaitStrategy for disruptor
    */
-  public static WaitStrategy build(Option<String> name) {
-    DisruptorWaitStrategyType strategyType = name.isPresent() ? 
DisruptorWaitStrategyType.valueOf(name.get().toUpperCase()) : BLOCKING_WAIT;
+  public static WaitStrategy build(String name) {
+    DisruptorWaitStrategyType strategyType = 
DisruptorWaitStrategyType.valueOf(name);
 
     switch (strategyType) {
       case BLOCKING_WAIT:

Reply via email to