This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a5434b6b4d [HUDI-3963] Use Lock-Free Message Queue Disruptor Improving 
Hoodie Writing Efficiency  (#5416)
a5434b6b4d is described below

commit a5434b6b4d9bef9eea29bf33f08e7f13753057a9
Author: YueZhang <[email protected]>
AuthorDate: Thu Nov 3 08:02:18 2022 +0800

    [HUDI-3963] Use Lock-Free Message Queue Disruptor Improving Hoodie Writing 
Efficiency  (#5416)
    
    https://issues.apache.org/jira/browse/HUDI-3963
    RFC design : #5567
    
    Add Lock-Free executor to improve hoodie writing throughput and optimize 
execution efficiency.
    Disruptor linked: 
https://lmax-exchange.github.io/disruptor/user-guide/index.html#_introduction. 
Existing BoundedInMemory is the default. Users can enable on a need basis.
    
    Co-authored-by: yuezhang <[email protected]>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  51 ++++
 .../hudi/execution/CopyOnWriteInsertHandler.java   |   4 +-
 .../action/bootstrap/BootstrapRecordConsumer.java  |   8 +-
 .../hudi/table/action/commit/BaseMergeHelper.java  |   8 +-
 .../hudi/execution/ExplicitWriteHandler.java       |   4 +-
 .../hudi/execution/SparkLazyInsertIterable.java    |  13 +-
 .../hudi/util/QueueBasedExecutorFactory.java       |  52 ++++
 .../TestBoundedInMemoryExecutorInSpark.java        |  37 +--
 .../hudi/execution/TestBoundedInMemoryQueue.java   |  26 +-
 .../execution/TestDisruptorExecutionInSpark.java   | 159 ++++++++++
 .../hudi/execution/TestDisruptorMessageQueue.java  | 337 +++++++++++++++++++++
 hudi-common/pom.xml                                |   6 +
 .../hudi/common/util/CustomizedThreadFactory.java  |  22 +-
 .../hudi/common/util/ParquetReaderIterator.java    |   4 +-
 .../common/util/queue/BoundedInMemoryExecutor.java | 143 ++++-----
 ...ueue.java => BoundedInMemoryQueueIterable.java} |  24 +-
 .../hudi/common/util/queue/DisruptorExecutor.java  | 129 ++++++++
 .../common/util/queue/DisruptorMessageQueue.java   | 136 +++++++++
 .../util/queue/DisruptorWaitStrategyType.java      |  64 ++++
 ...nMemoryQueueConsumer.java => ExecutorType.java} |  48 ++-
 .../util/queue/FunctionBasedQueueProducer.java     |   8 +-
 ...emoryQueueProducer.java => HoodieConsumer.java} |  12 +-
 ...emoryQueueProducer.java => HoodieExecutor.java} |  26 +-
 .../hudi/common/util/queue/HoodieExecutorBase.java | 143 +++++++++
 ...oducer.java => HoodieIterableMessageQueue.java} |  15 +-
 ...yQueueProducer.java => HoodieMessageQueue.java} |  35 ++-
 ...emoryQueueProducer.java => HoodieProducer.java} |   8 +-
 ...nsumer.java => IteratorBasedQueueConsumer.java} |  12 +-
 .../util/queue/IteratorBasedQueueProducer.java     |   4 +-
 .../common/util/queue/WaitStrategyFactory.java     |  54 ++++
 .../org/apache/hudi/table/format/FormatUtils.java  |   6 +-
 .../realtime/RealtimeUnmergedRecordReader.java     |   6 +-
 .../benchmark/BoundInMemoryExecutorBenchmark.scala | 135 +++++++++
 pom.xml                                            |   1 +
 34 files changed, 1498 insertions(+), 242 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 0c0966bc5f..514a4e38dc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -49,6 +49,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.queue.ExecutorType;
 import org.apache.hudi.config.metrics.HoodieMetricsCloudWatchConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig;
@@ -84,12 +85,14 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.util.queue.ExecutorType.BOUNDED_IN_MEMORY;
 import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;
 
 /**
@@ -132,6 +135,14 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Key generator class, that implements 
`org.apache.hudi.keygen.KeyGenerator` "
           + "extract a key out of incoming records.");
 
+  public static final ConfigProperty<String> EXECUTOR_TYPE = ConfigProperty
+      .key("hoodie.write.executor.type")
+      .defaultValue(BOUNDED_IN_MEMORY.name())
+      .withDocumentation("Set executor which orchestrates concurrent producers 
and consumers communicating through a message queue."
+          + "default value is BOUNDED_IN_MEMORY which use a bounded in-memory 
queue using LinkedBlockingQueue."
+          + "Also users could use DISRUPTOR, which use disruptor as a lock 
free message queue "
+          + "to gain better writing performance if lock was the bottleneck. 
Although DISRUPTOR_EXECUTOR is still an experimental feature.");
+
   public static final ConfigProperty<String> KEYGENERATOR_TYPE = ConfigProperty
       .key("hoodie.datasource.write.keygenerator.type")
       .defaultValue(KeyGeneratorType.SIMPLE.name())
@@ -233,6 +244,19 @@ public class HoodieWriteConfig extends HoodieConfig {
       .defaultValue(String.valueOf(4 * 1024 * 1024))
       .withDocumentation("Size of in-memory buffer used for parallelizing 
network reads and lake storage writes.");
 
+  public static final ConfigProperty<String> WRITE_DISRUPTOR_BUFFER_SIZE = 
ConfigProperty
+      .key("hoodie.write.executor.disruptor.buffer.size")
+      .defaultValue(String.valueOf(1024))
+      .withDocumentation("The size of the Disruptor Executor ring buffer, must 
be power of 2");
+
+  public static final ConfigProperty<String> WRITE_WAIT_STRATEGY = 
ConfigProperty
+      .key("hoodie.write.executor.disruptor.wait.strategy")
+      .defaultValue("BLOCKING_WAIT")
+      .withDocumentation("Strategy employed for making Disruptor Executor wait 
on a cursor. Other options are "
+          + "SLEEPING_WAIT, it attempts to be conservative with CPU usage by 
using a simple busy wait loop"
+          + "YIELDING_WAIT, it is designed for cases where there is the option 
to burn CPU cycles with the goal of improving latency"
+          + "BUSY_SPIN_WAIT, it can be used in low-latency systems, but puts 
the highest constraints on the deployment environment");
+
   public static final ConfigProperty<String> COMBINE_BEFORE_INSERT = 
ConfigProperty
       .key("hoodie.combine.before.insert")
       .defaultValue("false")
@@ -975,6 +999,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(KEYGENERATOR_CLASS_NAME);
   }
 
+  public ExecutorType getExecutorType() {
+    return 
ExecutorType.valueOf(getString(EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
+  }
+
   public boolean isCDCEnabled() {
     return getBooleanOrDefault(
         HoodieTableConfig.CDC_ENABLED, 
HoodieTableConfig.CDC_ENABLED.defaultValue());
@@ -1040,6 +1068,14 @@ public class HoodieWriteConfig extends HoodieConfig {
     return 
Integer.parseInt(getStringOrDefault(WRITE_BUFFER_LIMIT_BYTES_VALUE));
   }
 
+  public Option<String> getWriteExecutorWaitStrategy() {
+    return Option.of(getString(WRITE_WAIT_STRATEGY));
+  }
+
+  public Option<Integer> getDisruptorWriteBufferSize() {
+    return 
Option.of(Integer.parseInt(getStringOrDefault(WRITE_DISRUPTOR_BUFFER_SIZE)));
+  }
+
   public boolean shouldCombineBeforeInsert() {
     return getBoolean(COMBINE_BEFORE_INSERT);
   }
@@ -2287,6 +2323,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withExecutorType(String executorClass) {
+      writeConfig.setValue(EXECUTOR_TYPE, executorClass);
+      return this;
+    }
+
     public Builder withTimelineLayoutVersion(int version) {
       writeConfig.setValue(TIMELINE_LAYOUT_VERSION_NUM, 
String.valueOf(version));
       return this;
@@ -2333,6 +2374,16 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withWriteWaitStrategy(String waitStrategy) {
+      writeConfig.setValue(WRITE_WAIT_STRATEGY, String.valueOf(waitStrategy));
+      return this;
+    }
+
+    public Builder withWriteBufferSize(int size) {
+      writeConfig.setValue(WRITE_DISRUPTOR_BUFFER_SIZE, String.valueOf(size));
+      return this;
+    }
+
     public Builder combineInput(boolean onInsert, boolean onUpsert) {
       writeConfig.setValue(COMBINE_BEFORE_INSERT, String.valueOf(onInsert));
       writeConfig.setValue(COMBINE_BEFORE_UPSERT, String.valueOf(onUpsert));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
index 5e1f832b7f..abb1122289 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
@@ -22,7 +22,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import 
org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenResult;
 import org.apache.hudi.io.HoodieWriteHandle;
@@ -38,7 +38,7 @@ import java.util.Map;
  * Consumes stream of hoodie records from in-memory queue and writes to one or 
more create-handles.
  */
 public class CopyOnWriteInsertHandler<T extends HoodieRecordPayload>
-    extends 
BoundedInMemoryQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>, 
List<WriteStatus>> {
+    extends 
IteratorBasedQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>, 
List<WriteStatus>> {
 
   private HoodieWriteConfig config;
   private String instantTime;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
index 8966a5d51c..76968c6108 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
@@ -20,7 +20,7 @@ package org.apache.hudi.table.action.bootstrap;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.HoodieBootstrapHandle;
 
@@ -29,7 +29,7 @@ import java.io.IOException;
 /**
  * Consumer that dequeues records from queue and sends to Merge Handle for 
writing.
  */
-public class BootstrapRecordConsumer extends 
BoundedInMemoryQueueConsumer<HoodieRecord, Void> {
+public class BootstrapRecordConsumer extends 
IteratorBasedQueueConsumer<HoodieRecord, Void> {
 
   private final HoodieBootstrapHandle bootstrapHandle;
 
@@ -38,7 +38,7 @@ public class BootstrapRecordConsumer extends 
BoundedInMemoryQueueConsumer<Hoodie
   }
 
   @Override
-  protected void consumeOneRecord(HoodieRecord record) {
+  public void consumeOneRecord(HoodieRecord record) {
     try {
       bootstrapHandle.write(record, ((HoodieRecordPayload) record.getData())
           .getInsertValue(bootstrapHandle.getWriterSchemaWithMetaFields()));
@@ -48,7 +48,7 @@ public class BootstrapRecordConsumer extends 
BoundedInMemoryQueueConsumer<Hoodie
   }
 
   @Override
-  protected void finish() {}
+  public void finish() {}
 
   @Override
   protected Void getResult() {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
index 5ead348140..bd1c01958b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
@@ -22,7 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.utils.MergingIterator;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
@@ -109,7 +109,7 @@ public abstract class BaseMergeHelper<T extends 
HoodieRecordPayload, I, K, O> {
   /**
    * Consumer that dequeues records from queue and sends to Merge Handle.
    */
-  protected static class UpdateHandler extends 
BoundedInMemoryQueueConsumer<GenericRecord, Void> {
+  protected static class UpdateHandler extends 
IteratorBasedQueueConsumer<GenericRecord, Void> {
 
     private final HoodieMergeHandle upsertHandle;
 
@@ -118,12 +118,12 @@ public abstract class BaseMergeHelper<T extends 
HoodieRecordPayload, I, K, O> {
     }
 
     @Override
-    protected void consumeOneRecord(GenericRecord record) {
+    public void consumeOneRecord(GenericRecord record) {
       upsertHandle.write(record);
     }
 
     @Override
-    protected void finish() {}
+    public void finish() {}
 
     @Override
     protected Void getResult() {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java
index 46eff58757..9dab2170e0 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java
@@ -21,7 +21,7 @@ package org.apache.hudi.execution;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
 import org.apache.hudi.io.HoodieWriteHandle;
 
 import java.util.ArrayList;
@@ -31,7 +31,7 @@ import java.util.List;
  * Consumes stream of hoodie records from in-memory queue and writes to one 
explicit create handle.
  */
 public class ExplicitWriteHandler<T extends HoodieRecordPayload>
-    extends 
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 List<WriteStatus>> {
+    extends 
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 List<WriteStatus>> {
 
   private final List<WriteStatus> statuses = new ArrayList<>();
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
index df5bd2d3f4..d2555f9598 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
@@ -23,13 +23,14 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.WriteHandleFactory;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
+import org.apache.hudi.util.QueueBasedExecutorFactory;
 
 import java.util.Iterator;
 import java.util.List;
@@ -77,16 +78,16 @@ public class SparkLazyInsertIterable<T extends 
HoodieRecordPayload> extends Hood
   @Override
   protected List<WriteStatus> computeNext() {
     // Executor service used for launching writer thread.
-    BoundedInMemoryExecutor<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> 
bufferedIteratorExecutor =
-        null;
+    HoodieExecutor<?, ?, List<WriteStatus>> bufferedIteratorExecutor = null;
     try {
       Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
       if (useWriterSchema) {
         schema = HoodieAvroUtils.addMetadataFields(schema);
       }
-      bufferedIteratorExecutor =
-          new 
BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, 
getInsertHandler(),
-              getTransformFunction(schema, hoodieConfig), 
hoodieTable.getPreExecuteRunnable());
+
+      bufferedIteratorExecutor = 
QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(),
+          getTransformFunction(schema, hoodieConfig), 
hoodieTable.getPreExecuteRunnable());
+
       final List<WriteStatus> result = bufferedIteratorExecutor.execute();
       assert result != null && !result.isEmpty() && 
!bufferedIteratorExecutor.isRemaining();
       return result;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java
new file mode 100644
index 0000000000..ba8ddbd1ec
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.ExecutorType;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Iterator;
+import java.util.function.Function;
+
+public class QueueBasedExecutorFactory {
+
+  /**
+   * Create a new hoodie executor instance on demand.
+   */
+  public static <I, O, E> HoodieExecutor create(HoodieWriteConfig 
hoodieConfig, Iterator<I> inputItr, IteratorBasedQueueConsumer<O, E> consumer,
+                                             Function<I, O> transformFunction, 
Runnable preExecuteRunnable) {
+    ExecutorType executorType = hoodieConfig.getExecutorType();
+
+    switch (executorType) {
+      case BOUNDED_IN_MEMORY:
+        return new 
BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, 
consumer,
+            transformFunction, preExecuteRunnable);
+      case DISRUPTOR:
+        return new 
DisruptorExecutor<>(hoodieConfig.getDisruptorWriteBufferSize(), inputItr, 
consumer,
+            transformFunction, hoodieConfig.getWriteExecutorWaitStrategy(), 
preExecuteRunnable);
+      default:
+        throw new HoodieException("Unsupported Executor Type " + executorType);
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
index a714d60d00..040634da4c 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
@@ -23,7 +23,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
@@ -71,24 +71,21 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieClientTestHarness
   @Test
   public void testExecutor() {
 
-    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 100);
+    final int recordNumber = 100;
+    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, recordNumber);
 
     HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
     when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
-    
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
-        new 
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
+    
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
+        new 
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
 
           private int count = 0;
 
           @Override
-          protected void 
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
 record) {
+          public void 
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
 record) {
             count++;
           }
 
-          @Override
-          protected void finish() {
-          }
-
           @Override
           protected Integer getResult() {
             return count;
@@ -100,7 +97,7 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieClientTestHarness
       executor = new 
BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), 
hoodieRecords.iterator(), consumer,
           getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), 
getPreExecuteRunnable());
       int result = executor.execute();
-      // It should buffer and write 100 records
+
       assertEquals(100, result);
       // There should be no remaining records in the buffer
       assertFalse(executor.isRemaining());
@@ -118,11 +115,11 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieClientTestHarness
 
     HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
     when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
-    
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
-        new 
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
+    
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
+        new 
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
 
           @Override
-          protected void 
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
 record) {
+          public void 
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
 record) {
             try {
               while (true) {
                 Thread.sleep(1000);
@@ -132,10 +129,6 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieClientTestHarness
             }
           }
 
-          @Override
-          protected void finish() {
-          }
-
           @Override
           protected Integer getResult() {
             return 0;
@@ -176,14 +169,10 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieClientTestHarness
       }
     };
 
-    
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
-        new 
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
-          @Override
-          protected void 
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
 record) {
-          }
-
+    
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
+        new 
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
           @Override
-          protected void finish() {
+          public void 
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
 record) {
           }
 
           @Override
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
index 4707a68072..c36554bb64 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
@@ -26,9 +26,9 @@ import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SizeEstimator;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueIterable;
 import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
 import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
@@ -83,8 +83,8 @@ public class TestBoundedInMemoryQueue extends 
HoodieClientTestHarness {
   public void testRecordReading() throws Exception {
     final int numRecords = 128;
     final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, numRecords);
-    final BoundedInMemoryQueue<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
-        new BoundedInMemoryQueue(FileIOUtils.KB, 
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+    final BoundedInMemoryQueueIterable<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
+        new BoundedInMemoryQueueIterable(FileIOUtils.KB, 
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
     // Produce
     Future<Boolean> resFuture = executorService.submit(() -> {
       new 
IteratorBasedQueueProducer<>(hoodieRecords.iterator()).produce(queue);
@@ -123,8 +123,8 @@ public class TestBoundedInMemoryQueue extends 
HoodieClientTestHarness {
     final int numProducers = 40;
     final List<List<HoodieRecord>> recs = new ArrayList<>();
 
-    final BoundedInMemoryQueue<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
-        new BoundedInMemoryQueue(FileIOUtils.KB, 
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+    final BoundedInMemoryQueueIterable<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
+        new BoundedInMemoryQueueIterable(FileIOUtils.KB, 
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
 
     // Record Key to <Producer Index, Rec Index within a producer>
     Map<String, Tuple2<Integer, Integer>> keyToProducerAndIndexMap = new 
HashMap<>();
@@ -140,7 +140,7 @@ public class TestBoundedInMemoryQueue extends 
HoodieClientTestHarness {
       recs.add(pRecs);
     }
 
-    List<BoundedInMemoryQueueProducer<HoodieRecord>> producers = new 
ArrayList<>();
+    List<HoodieProducer<HoodieRecord>> producers = new ArrayList<>();
     for (int i = 0; i < recs.size(); i++) {
       final List<HoodieRecord> r = recs.get(i);
       // Alternate between pull and push based iterators
@@ -222,8 +222,8 @@ public class TestBoundedInMemoryQueue extends 
HoodieClientTestHarness {
         
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply((HoodieAvroRecord)
 hoodieRecords.get(0));
     final long objSize = sizeEstimator.sizeEstimate(payload);
     final long memoryLimitInBytes = recordLimit * objSize;
-    final BoundedInMemoryQueue<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
-        new BoundedInMemoryQueue(memoryLimitInBytes, 
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+    final BoundedInMemoryQueueIterable<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
+        new BoundedInMemoryQueueIterable(memoryLimitInBytes, 
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
 
     // Produce
     executorService.submit(() -> {
@@ -275,8 +275,8 @@ public class TestBoundedInMemoryQueue extends 
HoodieClientTestHarness {
     // first let us throw exception from queueIterator reader and test that 
queueing thread
     // stops and throws
     // correct exception back.
-    BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>> queue1 =
-        new BoundedInMemoryQueue(memoryLimitInBytes, 
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+    BoundedInMemoryQueueIterable<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>> queue1 =
+        new BoundedInMemoryQueueIterable(memoryLimitInBytes, 
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
 
     // Produce
     Future<Boolean> resFuture = executorService.submit(() -> {
@@ -303,8 +303,8 @@ public class TestBoundedInMemoryQueue extends 
HoodieClientTestHarness {
     final Iterator<HoodieRecord> mockHoodieRecordsIterator = 
mock(Iterator.class);
     when(mockHoodieRecordsIterator.hasNext()).thenReturn(true);
     when(mockHoodieRecordsIterator.next()).thenThrow(expectedException);
-    BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>> queue2 =
-        new BoundedInMemoryQueue(memoryLimitInBytes, 
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+    BoundedInMemoryQueueIterable<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>> queue2 =
+        new BoundedInMemoryQueueIterable(memoryLimitInBytes, 
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
 
     // Produce
     Future<Boolean> res = executorService.submit(() -> {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
new file mode 100644
index 0000000000..2351f2bbed
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static 
org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness {
+
+  private final String instantTime = 
HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  @Test
+  public void testExecutor() {
+
+    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 128);
+    final List<HoodieRecord> consumedRecords = new ArrayList<>();
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    
when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(8));
+    
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
+        new 
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
+
+          private int count = 0;
+
+          @Override
+          public void 
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
 record) {
+            consumedRecords.add(record.record);
+            count++;
+          }
+
+          @Override
+          protected Integer getResult() {
+            return count;
+          }
+        };
+    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> exec = null;
+
+    try {
+      exec = new 
DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), 
hoodieRecords.iterator(), consumer,
+          getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), 
Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable());
+      int result = exec.execute();
+      // It should buffer and write 100 records
+      assertEquals(128, result);
+      // There should be no remaining records in the buffer
+      assertFalse(exec.isRemaining());
+
+      // collect all records and assert that consumed records are identical to 
produced ones
+      // assert there's no tampering, and that the ordering is preserved
+      assertEquals(hoodieRecords, consumedRecords);
+      for (int i = 0; i < hoodieRecords.size(); i++) {
+        assertEquals(hoodieRecords.get(i), consumedRecords.get(i));
+      }
+
+    } finally {
+      if (exec != null) {
+        exec.shutdownNow();
+      }
+    }
+  }
+
+  @Test
+  @Timeout(value = 60)
+  public void testInterruptExecutor() {
+    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 100);
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    
when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(1024));
+    
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
+        new 
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
+
+          @Override
+          public void 
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
 record) {
+            try {
+              Thread.currentThread().wait();
+            } catch (InterruptedException ie) {
+              // ignore here
+            }
+          }
+
+          @Override
+          protected Integer getResult() {
+            return 0;
+          }
+        };
+
+    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer>
+        executor = new 
DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), 
hoodieRecords.iterator(), consumer,
+        getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), 
Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable());
+
+    try {
+      Thread.currentThread().interrupt();
+      assertThrows(HoodieException.class, executor::execute);
+      assertTrue(Thread.interrupted());
+    } catch (Exception e) {
+      // ignore here
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
new file mode 100644
index 0000000000..d296d56440
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static 
org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.queue.DisruptorMessageQueue;
+import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorMessageQueue extends HoodieClientTestHarness {
+
+  private final String instantTime = 
HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  // Test to ensure that we are reading all records from queue iterator in the 
same order
+  // without any exceptions.
+  @SuppressWarnings("unchecked")
+  @Test
+  @Timeout(value = 60)
+  public void testRecordReading() throws Exception {
+
+    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 100);
+    ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> beforeIndexedRecord = new ArrayList<>();
+    ArrayList<HoodieAvroRecord> afterRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> afterIndexedRecord = new ArrayList<>();
+
+    hoodieRecords.forEach(record -> {
+      final HoodieAvroRecord originalRecord = (HoodieAvroRecord) record;
+      beforeRecord.add(originalRecord);
+      try {
+        final Option<IndexedRecord> originalInsertValue =
+            
originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
+        beforeIndexedRecord.add(originalInsertValue.get());
+      } catch (IOException e) {
+        // ignore exception here.
+      }
+    });
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    
when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(16));
+    
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
+        new 
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
+
+          private int count = 0;
+
+          @Override
+          public void 
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
 record) {
+            count++;
+            afterRecord.add((HoodieAvroRecord) record.record);
+            try {
+              IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) 
record.record)
+                  
.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
+              afterIndexedRecord.add(indexedRecord);
+            } catch (IOException e) {
+             //ignore exception here.
+            }
+          }
+
+          @Override
+          protected Integer getResult() {
+            return count;
+          }
+    };
+
+    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> exec = null;
+
+    try {
+      exec = new 
DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), 
hoodieRecords.iterator(), consumer,
+          getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), 
Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable());
+      int result = exec.execute();
+      // It should buffer and write 100 records
+      assertEquals(100, result);
+      // There should be no remaining records in the buffer
+      assertFalse(exec.isRemaining());
+
+      assertEquals(beforeRecord, afterRecord);
+      assertEquals(beforeIndexedRecord, afterIndexedRecord);
+
+    } finally {
+      if (exec != null) {
+        exec.shutdownNow();
+      }
+    }
+  }
+
+  /**
+   * Test to ensure that we are reading all records from queue iterator when 
we have multiple producers.
+   */
+  @SuppressWarnings("unchecked")
+  @Test
+  @Timeout(value = 60)
+  public void testCompositeProducerRecordReading() throws Exception {
+    final int numRecords = 1000;
+    final int numProducers = 40;
+    final List<List<HoodieRecord>> recs = new ArrayList<>();
+
+    final DisruptorMessageQueue<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
+        new DisruptorMessageQueue(Option.of(1024), 
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
+            Option.of("BLOCKING_WAIT"), numProducers, new Runnable() {
+              @Override
+          public void run() {
+                // do nothing.
+              }
+            });
+
+    // Record Key to <Producer Index, Rec Index within a producer>
+    Map<String, Pair<Integer, Integer>> keyToProducerAndIndexMap = new 
HashMap<>();
+
+    for (int i = 0; i < numProducers; i++) {
+      List<HoodieRecord> pRecs = dataGen.generateInserts(instantTime, 
numRecords);
+      int j = 0;
+      for (HoodieRecord r : pRecs) {
+        assertFalse(keyToProducerAndIndexMap.containsKey(r.getRecordKey()));
+        keyToProducerAndIndexMap.put(r.getRecordKey(), Pair.of(i, j));
+        j++;
+      }
+      recs.add(pRecs);
+    }
+
+    List<HoodieProducer> producers = new ArrayList<>();
+    for (int i = 0; i < recs.size(); i++) {
+      final List<HoodieRecord> r = recs.get(i);
+      // Alternate between pull and push based iterators
+      if (i % 2 == 0) {
+        HoodieProducer producer = new 
IteratorBasedQueueProducer<>(r.iterator());
+        producers.add(producer);
+      } else {
+        HoodieProducer producer = new FunctionBasedQueueProducer<>((buf) -> {
+          Iterator<HoodieRecord> itr = r.iterator();
+          while (itr.hasNext()) {
+            try {
+              buf.insertRecord(itr.next());
+            } catch (Exception e) {
+              throw new HoodieException(e);
+            }
+          }
+          return true;
+        });
+        producers.add(producer);
+      }
+    }
+
+    // Used to ensure that consumer sees the records generated by a single 
producer in FIFO order
+    Map<Integer, Integer> lastSeenMap =
+        IntStream.range(0, 
numProducers).boxed().collect(Collectors.toMap(Function.identity(), x -> -1));
+    Map<Integer, Integer> countMap =
+        IntStream.range(0, 
numProducers).boxed().collect(Collectors.toMap(Function.identity(), x -> 0));
+
+    // setup consumer and start disruptor
+    
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
+        new 
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
+
+          @Override
+          public void 
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
 payload) {
+            // Read recs and ensure we have covered all producer recs.
+            final HoodieRecord rec = payload.record;
+            Pair<Integer, Integer> producerPos = 
keyToProducerAndIndexMap.get(rec.getRecordKey());
+            Integer lastSeenPos = lastSeenMap.get(producerPos.getLeft());
+            countMap.put(producerPos.getLeft(), 
countMap.get(producerPos.getLeft()) + 1);
+            lastSeenMap.put(producerPos.getLeft(), lastSeenPos + 1);
+            // Ensure we are seeing the next record generated
+            assertEquals(lastSeenPos + 1, producerPos.getRight().intValue());
+          }
+
+          @Override
+          protected Integer getResult() {
+            return 0;
+          }
+        };
+
+    Method setHandlersFunc = queue.getClass().getDeclaredMethod("setHandlers", 
IteratorBasedQueueConsumer.class);
+    setHandlersFunc.setAccessible(true);
+    setHandlersFunc.invoke(queue, consumer);
+
+    Method startFunc = queue.getClass().getDeclaredMethod("start");
+    startFunc.setAccessible(true);
+    startFunc.invoke(queue);
+
+    // start to produce records
+    CompletableFuture<Void> producerFuture = 
CompletableFuture.allOf(producers.stream().map(producer -> {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          producer.produce(queue);
+        } catch (Throwable e) {
+          throw new HoodieException("Error producing records in disruptor 
executor", e);
+        }
+        return true;
+      }, executorService);
+    }).toArray(CompletableFuture[]::new));
+
+    producerFuture.get();
+
+    // wait for all the records consumed.
+    queue.close();
+
+    for (int i = 0; i < numProducers; i++) {
+      // Ensure we have seen all the records for each producers
+      assertEquals(Integer.valueOf(numRecords), countMap.get(i));
+    }
+  }
+
+  /**
+   * Test to ensure that one of the producers exception will stop current 
ingestion.
+   */
+  @SuppressWarnings("unchecked")
+  @Test
+  @Timeout(value = 60)
+  public void testException() throws Exception {
+    final int numRecords = 1000;
+    final int numProducers = 40;
+
+    final DisruptorMessageQueue<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
+        new DisruptorMessageQueue(Option.of(1024), 
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
+            Option.of("BLOCKING_WAIT"), numProducers, new Runnable() {
+              @Override
+          public void run() {
+              // do nothing.
+              }
+            });
+
+    List<HoodieRecord> pRecs = dataGen.generateInserts(instantTime, 
numRecords);
+
+    // create 2 producers
+    // producer1 : common producer
+    // producer2 : exception producer
+    List<HoodieProducer> producers = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      if (i % 2 == 0) {
+        producers.add(new IteratorBasedQueueProducer<>(pRecs.iterator()));
+      } else {
+        producers.add(new FunctionBasedQueueProducer<>((buf) -> {
+          throw new HoodieException("Exception when produce records!!!");
+        }));
+      }
+    }
+
+
+    
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
+        new 
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
+
+      int count = 0;
+      @Override
+      public void 
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
 payload) {
+        // Read recs and ensure we have covered all producer recs.
+        final HoodieRecord rec = payload.record;
+        count++;
+      }
+
+      @Override
+      protected Integer getResult() {
+        return count;
+      }
+    };
+
+    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> exec = new DisruptorExecutor(Option.of(1024),
+        producers, Option.of(consumer), 
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
+        Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), 
getPreExecuteRunnable());
+
+    final Throwable thrown = assertThrows(HoodieException.class, exec::execute,
+        "exception is expected");
+    assertTrue(thrown.getMessage().contains("Error producing records in 
disruptor executor"));
+  }
+}
diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index 87b8e5e0be..15becc3c16 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -299,5 +299,11 @@
       <artifactId>joda-time</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>com.lmax</groupId>
+      <artifactId>disruptor</artifactId>
+      <version>${disruptor.version}</version>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CustomizedThreadFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CustomizedThreadFactory.java
index 738be514b2..a13f3a804f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CustomizedThreadFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CustomizedThreadFactory.java
@@ -33,6 +33,8 @@ public class CustomizedThreadFactory implements ThreadFactory 
{
   private final String threadName;
   private final boolean daemon;
 
+  private Runnable preExecuteRunnable;
+
   public CustomizedThreadFactory() {
     this("pool-" + POOL_NUM.getAndIncrement(), false);
   }
@@ -41,6 +43,16 @@ public class CustomizedThreadFactory implements 
ThreadFactory {
     this(threadNamePrefix, false);
   }
 
+  public CustomizedThreadFactory(String threadNamePrefix, Runnable 
preExecuteRunnable) {
+    this(threadNamePrefix, false, preExecuteRunnable);
+  }
+
+  public CustomizedThreadFactory(String threadNamePrefix, boolean daemon, 
Runnable preExecuteRunnable) {
+    this.threadName = threadNamePrefix + "-thread-";
+    this.daemon = daemon;
+    this.preExecuteRunnable = preExecuteRunnable;
+  }
+
   public CustomizedThreadFactory(String threadNamePrefix, boolean daemon) {
     this.threadName = threadNamePrefix + "-thread-";
     this.daemon = daemon;
@@ -48,7 +60,15 @@ public class CustomizedThreadFactory implements 
ThreadFactory {
 
   @Override
   public Thread newThread(@NotNull Runnable r) {
-    Thread runThread = new Thread(r);
+    Thread runThread = preExecuteRunnable == null ? new Thread(r) : new 
Thread(new Runnable() {
+
+      @Override
+      public void run() {
+        preExecuteRunnable.run();
+        r.run();
+      }
+    });
+
     runThread.setDaemon(daemon);
     runThread.setName(threadName + threadNum.getAndIncrement());
     return runThread;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
index 03bd471b60..347bcdf77a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.common.util;
 
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueIterable;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.parquet.hadoop.ParquetReader;
@@ -27,7 +27,7 @@ import java.io.IOException;
 
 /**
  * This class wraps a parquet reader and provides an iterator based api to 
read from a parquet file. This is used in
- * {@link BoundedInMemoryQueue}
+ * {@link BoundedInMemoryQueueIterable}
  */
 public class ParquetReaderIterator<T> implements ClosableIterator<T> {
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
index 46ef5dc40c..ce5898c7c3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
@@ -18,119 +18,98 @@
 
 package org.apache.hudi.common.util.queue;
 
-import org.apache.hudi.common.util.CustomizedThreadFactory;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SizeEstimator;
 import org.apache.hudi.exception.HoodieException;
 
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /**
- * Executor which orchestrates concurrent producers and consumers 
communicating through a bounded in-memory queue. This
+ * Executor which orchestrates concurrent producers and consumers 
communicating through 'BoundedInMemoryQueue'. This
  * class takes as input the size limit, queue producer(s), consumer and 
transformer and exposes API to orchestrate
  * concurrent execution of these actors communicating through a central 
bounded queue
  */
-public class BoundedInMemoryExecutor<I, O, E> {
+public class BoundedInMemoryExecutor<I, O, E> extends HoodieExecutorBase<I, O, 
E> {
 
   private static final Logger LOG = 
LogManager.getLogger(BoundedInMemoryExecutor.class);
-  private static final long TERMINATE_WAITING_TIME_SECS = 60L;
-  // Executor service used for launching write thread.
-  private final ExecutorService producerExecutorService;
-  // Executor service used for launching read thread.
-  private final ExecutorService consumerExecutorService;
-  // Used for buffering records which is controlled by 
HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
-  private final BoundedInMemoryQueue<I, O> queue;
-  // Producers
-  private final List<BoundedInMemoryQueueProducer<I>> producers;
-  // Consumer
-  private final Option<BoundedInMemoryQueueConsumer<O, E>> consumer;
-  // pre-execute function to implement environment specific behavior before 
executors (producers/consumer) run
-  private final Runnable preExecuteRunnable;
+  private final HoodieMessageQueue<I, O> queue;
 
   public BoundedInMemoryExecutor(final long bufferLimitInBytes, final 
Iterator<I> inputItr,
-                                 BoundedInMemoryQueueConsumer<O, E> consumer, 
Function<I, O> transformFunction, Runnable preExecuteRunnable) {
+                                 IteratorBasedQueueConsumer<O, E> consumer, 
Function<I, O> transformFunction, Runnable preExecuteRunnable) {
     this(bufferLimitInBytes, new IteratorBasedQueueProducer<>(inputItr), 
Option.of(consumer), transformFunction, preExecuteRunnable);
   }
 
-  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
BoundedInMemoryQueueProducer<I> producer,
-                                 Option<BoundedInMemoryQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction) {
+  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
HoodieProducer<I> producer,
+                                 Option<IteratorBasedQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction) {
     this(bufferLimitInBytes, producer, consumer, transformFunction, 
Functions.noop());
   }
 
-  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
BoundedInMemoryQueueProducer<I> producer,
-                                 Option<BoundedInMemoryQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction, Runnable preExecuteRunnable) {
+  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
HoodieProducer<I> producer,
+                                 Option<IteratorBasedQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction, Runnable preExecuteRunnable) {
     this(bufferLimitInBytes, Collections.singletonList(producer), consumer, 
transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable);
   }
 
-  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
List<BoundedInMemoryQueueProducer<I>> producers,
-                                 Option<BoundedInMemoryQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction,
+  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
List<HoodieProducer<I>> producers,
+                                 Option<IteratorBasedQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction,
                                  final SizeEstimator<O> sizeEstimator, 
Runnable preExecuteRunnable) {
-    this.producers = producers;
-    this.consumer = consumer;
-    this.preExecuteRunnable = preExecuteRunnable;
-    // Ensure fixed thread for each producer thread
-    this.producerExecutorService = 
Executors.newFixedThreadPool(producers.size(), new 
CustomizedThreadFactory("producer"));
-    // Ensure single thread for consumer
-    this.consumerExecutorService = Executors.newSingleThreadExecutor(new 
CustomizedThreadFactory("consumer"));
-    this.queue = new BoundedInMemoryQueue<>(bufferLimitInBytes, 
transformFunction, sizeEstimator);
+    super(producers, consumer, preExecuteRunnable);
+    this.queue = new BoundedInMemoryQueueIterable<>(bufferLimitInBytes, 
transformFunction, sizeEstimator);
   }
 
   /**
-   * Start all Producers.
+   * Start all producers at once.
    */
-  public ExecutorCompletionService<Boolean> startProducers() {
+  @Override
+  public CompletableFuture<Void> startProducers() {
     // Latch to control when and which producer thread will close the queue
     final CountDownLatch latch = new CountDownLatch(producers.size());
-    final ExecutorCompletionService<Boolean> completionService =
-        new ExecutorCompletionService<Boolean>(producerExecutorService);
-    producers.stream().map(producer -> {
-      return completionService.submit(() -> {
+
+    return CompletableFuture.allOf(producers.stream().map(producer -> {
+      return CompletableFuture.supplyAsync(() -> {
         try {
-          preExecuteRunnable.run();
           producer.produce(queue);
         } catch (Throwable e) {
           LOG.error("error producing records", e);
           queue.markAsFailed(e);
-          throw e;
+          throw new HoodieException("Error producing records in bounded in 
memory executor", e);
         } finally {
           synchronized (latch) {
             latch.countDown();
             if (latch.getCount() == 0) {
               // Mark production as done so that consumer will be able to exit
-              queue.close();
+              try {
+                queue.close();
+              } catch (IOException e) {
+                throw new HoodieIOException("Catch Exception when closing 
BoundedInMemoryQueue.", e);
+              }
             }
           }
         }
         return true;
-      });
-    }).collect(Collectors.toList());
-    return completionService;
+      }, producerExecutorService);
+    }).toArray(CompletableFuture[]::new));
   }
 
   /**
    * Start only consumer.
    */
-  private Future<E> startConsumer() {
+  @Override
+  protected CompletableFuture<E> startConsumer() {
     return consumer.map(consumer -> {
-      return consumerExecutorService.submit(() -> {
+      return CompletableFuture.supplyAsync(() -> {
         LOG.info("starting consumer thread");
-        preExecuteRunnable.run();
         try {
           E result = consumer.consume(queue);
           LOG.info("Queue Consumption is done; notifying producer threads");
@@ -138,61 +117,41 @@ public class BoundedInMemoryExecutor<I, O, E> {
         } catch (Exception e) {
           LOG.error("error consuming records", e);
           queue.markAsFailed(e);
-          throw e;
+          throw new HoodieException(e);
         }
-      });
+      }, consumerExecutorService);
     }).orElse(CompletableFuture.completedFuture(null));
   }
 
-  /**
-   * Main API to run both production and consumption.
-   */
-  public E execute() {
-    try {
-      startProducers();
-      Future<E> future = startConsumer();
-      // Wait for consumer to be done
-      return future.get();
-    } catch (InterruptedException ie) {
-      shutdownNow();
-      Thread.currentThread().interrupt();
-      throw new HoodieException(ie);
-    } catch (Exception e) {
-      throw new HoodieException(e);
-    }
+  @Override
+  public boolean isRemaining() {
+    return getQueue().iterator().hasNext();
   }
 
-  public boolean isRemaining() {
-    return queue.iterator().hasNext();
+  @Override
+  protected void postAction() {
+    super.close();
   }
 
+  @Override
   public void shutdownNow() {
     producerExecutorService.shutdownNow();
     consumerExecutorService.shutdownNow();
     // close queue to force producer stop
-    queue.close();
-  }
-
-  public boolean awaitTermination() {
-    // if current thread has been interrupted before awaitTermination was 
called, we still give
-    // executor a chance to proceeding. So clear the interrupt flag and reset 
it if needed before return.
-    boolean interruptedBefore = Thread.interrupted();
-    boolean producerTerminated = false;
-    boolean consumerTerminated = false;
     try {
-      producerTerminated = 
producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, 
TimeUnit.SECONDS);
-      consumerTerminated = 
consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, 
TimeUnit.SECONDS);
-    } catch (InterruptedException ie) {
-      // fail silently for any other interruption
+      queue.close();
+    } catch (IOException e) {
+      throw new HoodieIOException("catch IOException while closing 
HoodieMessageQueue", e);
     }
-    // reset interrupt flag if needed
-    if (interruptedBefore) {
-      Thread.currentThread().interrupt();
-    }
-    return producerTerminated && consumerTerminated;
   }
 
-  public BoundedInMemoryQueue<I, O> getQueue() {
-    return queue;
+  @Override
+  public BoundedInMemoryQueueIterable<I, O> getQueue() {
+    return (BoundedInMemoryQueueIterable<I, O>)queue;
+  }
+
+  @Override
+  protected void setup() {
+    // do nothing.
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueIterable.java
similarity index 94%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueIterable.java
index dfe33b49ec..47b8c81fc4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueIterable.java
@@ -49,7 +49,7 @@ import java.util.function.Function;
  * @param <I> input payload data type
  * @param <O> output payload data type
  */
-public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
+public class BoundedInMemoryQueueIterable<I, O> extends 
HoodieIterableMessageQueue<I, O> {
 
   /** Interval used for polling records in the queue. **/
   public static final int RECORD_POLL_INTERVAL_SEC = 1;
@@ -60,7 +60,7 @@ public class BoundedInMemoryQueue<I, O> implements 
Iterable<O> {
   /** Maximum records that will be cached. **/
   private static final int RECORD_CACHING_LIMIT = 128 * 1024;
 
-  private static final Logger LOG = 
LogManager.getLogger(BoundedInMemoryQueue.class);
+  private static final Logger LOG = 
LogManager.getLogger(BoundedInMemoryQueueIterable.class);
 
   /**
    * It indicates number of records to cache. We will be using sampled 
record's average size to
@@ -116,7 +116,7 @@ public class BoundedInMemoryQueue<I, O> implements 
Iterable<O> {
    * @param memoryLimit MemoryLimit in bytes
    * @param transformFunction Transformer Function to convert input payload 
type to stored payload type
    */
-  public BoundedInMemoryQueue(final long memoryLimit, final Function<I, O> 
transformFunction) {
+  public BoundedInMemoryQueueIterable(final long memoryLimit, final 
Function<I, O> transformFunction) {
     this(memoryLimit, transformFunction, new DefaultSizeEstimator() {});
   }
 
@@ -127,15 +127,16 @@ public class BoundedInMemoryQueue<I, O> implements 
Iterable<O> {
    * @param transformFunction Transformer Function to convert input payload 
type to stored payload type
    * @param payloadSizeEstimator Payload Size Estimator
    */
-  public BoundedInMemoryQueue(final long memoryLimit, final Function<I, O> 
transformFunction,
-      final SizeEstimator<O> payloadSizeEstimator) {
+  public BoundedInMemoryQueueIterable(final long memoryLimit, final 
Function<I, O> transformFunction,
+                                      final SizeEstimator<O> 
payloadSizeEstimator) {
     this.memoryLimit = memoryLimit;
     this.transformFunction = transformFunction;
     this.payloadSizeEstimator = payloadSizeEstimator;
     this.iterator = new QueueIterator();
   }
 
-  public int size() {
+  @Override
+  public long size() {
     return this.queue.size();
   }
 
@@ -174,6 +175,7 @@ public class BoundedInMemoryQueue<I, O> implements 
Iterable<O> {
    *
    * @param t Item to be queued
    */
+  @Override
   public void insertRecord(I t) throws Exception {
     // If already closed, throw exception
     if (isWriteDone.get()) {
@@ -203,7 +205,8 @@ public class BoundedInMemoryQueue<I, O> implements 
Iterable<O> {
    * Reader interface but never exposed to outside world as this is a single 
consumer queue. Reading is done through a
    * singleton iterator for this queue.
    */
-  private Option<O> readNextRecord() {
+  @Override
+  public Option<O> readNextRecord() {
     if (this.isReadDone.get()) {
       return Option.empty();
     }
@@ -237,6 +240,7 @@ public class BoundedInMemoryQueue<I, O> implements 
Iterable<O> {
   /**
    * Puts an empty entry to queue to denote termination.
    */
+  @Override
   public void close() {
     // done queueing records notifying queue-reader.
     isWriteDone.set(true);
@@ -252,6 +256,7 @@ public class BoundedInMemoryQueue<I, O> implements 
Iterable<O> {
   /**
    * API to allow producers and consumer to communicate termination due to 
failure.
    */
+  @Override
   public void markAsFailed(Throwable e) {
     this.hasFailed.set(e);
     // release the permits so that if the queueing thread is waiting for 
permits then it will
@@ -259,6 +264,11 @@ public class BoundedInMemoryQueue<I, O> implements 
Iterable<O> {
     this.rateLimiter.release(RECORD_CACHING_LIMIT + 1);
   }
 
+  @Override
+  public boolean isEmpty() {
+    return this.queue.size() == 0;
+  }
+
   @Override
   public Iterator<O> iterator() {
     return iterator;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java
new file mode 100644
index 0000000000..7ea5de07c0
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Executor which orchestrates concurrent producers and consumers 
communicating through 'DisruptorMessageQueue'. This
+ * class takes as queue producer(s), consumer and transformer and exposes API 
to orchestrate
+ * concurrent execution of these actors communicating through disruptor
+ */
+public class DisruptorExecutor<I, O, E> extends HoodieExecutorBase<I, O, E> {
+
+  private static final Logger LOG = 
LogManager.getLogger(DisruptorExecutor.class);
+  private final HoodieMessageQueue<I, O> queue;
+
+  public DisruptorExecutor(final Option<Integer> bufferSize, final Iterator<I> 
inputItr,
+                           IteratorBasedQueueConsumer<O, E> consumer, 
Function<I, O> transformFunction, Option<String> waitStrategy, Runnable 
preExecuteRunnable) {
+    this(bufferSize, new IteratorBasedQueueProducer<>(inputItr), 
Option.of(consumer), transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final Option<Integer> bufferSize, HoodieProducer<I> 
producer,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, 
final Function<I, O> transformFunction, Option<String> waitStrategy, Runnable 
preExecuteRunnable) {
+    this(bufferSize, Collections.singletonList(producer), consumer, 
transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final Option<Integer> bufferSize, 
List<HoodieProducer<I>> producers,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, 
final Function<I, O> transformFunction,
+                           final Option<String> waitStrategy, Runnable 
preExecuteRunnable) {
+    super(producers, consumer, preExecuteRunnable);
+    this.queue = new DisruptorMessageQueue<>(bufferSize, transformFunction, 
waitStrategy, producers.size(), preExecuteRunnable);
+  }
+
+  /**
+   * Start all Producers.
+   */
+  @Override
+  public CompletableFuture<Void> startProducers() {
+    return CompletableFuture.allOf(producers.stream().map(producer -> {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          producer.produce(queue);
+        } catch (Throwable e) {
+          LOG.error("error producing records", e);
+          throw new HoodieException("Error producing records in disruptor 
executor", e);
+        }
+        return true;
+      }, producerExecutorService);
+    }).toArray(CompletableFuture[]::new));
+  }
+
+  @Override
+  protected void setup() {
+    ((DisruptorMessageQueue)queue).setHandlers(consumer.get());
+    ((DisruptorMessageQueue)queue).start();
+  }
+
+  @Override
+  protected void postAction() {
+    try {
+      super.close();
+      queue.close();
+    } catch (IOException e) {
+      throw new HoodieIOException("Catch IOException while closing 
DisruptorMessageQueue", e);
+    }
+  }
+
+  @Override
+  protected CompletableFuture<E> startConsumer() {
+    return producerFuture.thenApplyAsync(res -> {
+      try {
+        queue.close();
+        consumer.get().finish();
+        return consumer.get().getResult();
+      } catch (IOException e) {
+        throw new HoodieIOException("Catch Exception when closing", e);
+      }
+    }, consumerExecutorService);
+  }
+
+  @Override
+  public boolean isRemaining() {
+    return !queue.isEmpty();
+  }
+
+  @Override
+  public void shutdownNow() {
+    producerExecutorService.shutdownNow();
+    consumerExecutorService.shutdownNow();
+    try {
+      queue.close();
+    } catch (IOException e) {
+      throw new HoodieIOException("Catch IOException while closing 
DisruptorMessageQueue");
+    }
+  }
+
+  @Override
+  public DisruptorMessageQueue<I, O> getQueue() {
+    return (DisruptorMessageQueue<I, O>)queue;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java
new file mode 100644
index 0000000000..eccd881af1
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.hudi.common.util.CustomizedThreadFactory;
+import org.apache.hudi.common.util.Option;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+
+public class DisruptorMessageQueue<I, O> implements HoodieMessageQueue<I, O> {
+
+  private static final Logger LOG = 
LogManager.getLogger(DisruptorMessageQueue.class);
+
+  private final Disruptor<HoodieDisruptorEvent> queue;
+  private final Function<I, O> transformFunction;
+  private final RingBuffer<HoodieDisruptorEvent> ringBuffer;
+  private final Lock closeLocker = new ReentrantLock();
+
+  private boolean isDisruptorClosed = false;
+
+  public DisruptorMessageQueue(Option<Integer> bufferSize, Function<I, O> 
transformFunction, Option<String> waitStrategyName, int totalProducers, 
Runnable preExecuteRunnable) {
+    WaitStrategy waitStrategy = WaitStrategyFactory.build(waitStrategyName);
+    CustomizedThreadFactory threadFactory = new 
CustomizedThreadFactory("disruptor", true, preExecuteRunnable);
+
+    this.queue = new Disruptor<>(new HoodieDisruptorEventFactory(), 
bufferSize.get(), threadFactory, totalProducers > 1 ? ProducerType.MULTI : 
ProducerType.SINGLE, waitStrategy);
+    this.ringBuffer = queue.getRingBuffer();
+    this.transformFunction = transformFunction;
+  }
+
+  @Override
+  public long size() {
+    return ringBuffer.getBufferSize() - ringBuffer.remainingCapacity();
+  }
+
+  @Override
+  public void insertRecord(I value) throws Exception {
+    O applied = transformFunction.apply(value);
+    EventTranslator<HoodieDisruptorEvent> translator = (event, sequence) -> 
event.set(applied);
+    queue.getRingBuffer().publishEvent(translator);
+  }
+
+  @Override
+  public Option<O> readNextRecord() {
+    throw new UnsupportedOperationException("Should not call readNextRecord 
here. And let DisruptorMessageHandler to handle consuming logic");
+  }
+
+  @Override
+  public void markAsFailed(Throwable e) {
+    // do nothing.
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return ringBuffer.getBufferSize() == ringBuffer.remainingCapacity();
+  }
+
+  @Override
+  public void close() {
+    closeLocker.lock();
+    if (!isDisruptorClosed) {
+      queue.shutdown();
+      isDisruptorClosed = true;
+    }
+    closeLocker.unlock();
+  }
+
+  protected void setHandlers(IteratorBasedQueueConsumer consumer) {
+    queue.handleEventsWith(new EventHandler<HoodieDisruptorEvent>() {
+
+      @Override
+      public void onEvent(HoodieDisruptorEvent event, long sequence, boolean 
endOfBatch) throws Exception {
+        consumer.consumeOneRecord(event.get());
+      }
+    });
+  }
+
+  protected void start() {
+    queue.start();
+  }
+
+  /**
+   * HoodieDisruptorEventFactory is used to create/preallocate 
HoodieDisruptorEvent.
+   *
+   */
+  class HoodieDisruptorEventFactory implements 
EventFactory<HoodieDisruptorEvent> {
+
+    @Override
+    public HoodieDisruptorEvent newInstance() {
+      return new HoodieDisruptorEvent();
+    }
+  }
+
+  /**
+   * The unit of data passed from producer to consumer in disruptor world.
+   */
+  class HoodieDisruptorEvent {
+
+    private O value;
+
+    public void set(O value) {
+      this.value = value;
+    }
+
+    public O get() {
+      return this.value;
+    }
+  }
+}
+
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorWaitStrategyType.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorWaitStrategyType.java
new file mode 100644
index 0000000000..1a8e86835d
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorWaitStrategyType.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public enum DisruptorWaitStrategyType {
+
+  /**
+   * The BlockingWaitStrategy is the slowest of the available wait strategies, 
but is the most conservative with the respect to CPU usage
+   * and will give the most consistent behaviour across the widest variety of 
deployment options.
+   */
+  BLOCKING_WAIT,
+
+  /**
+   * Like the `BlockingWaitStrategy` the `SleepingWaitStrategy` it attempts to 
be conservative with CPU usage by using a simple busy wait loop.
+   * The difference is that the `SleepingWaitStrategy` uses a call to 
`LockSupport.parkNanos(1)` in the middle of the loop.
+   * On a typical Linux system this will pause the thread for around 60µs.
+   */
+  SLEEPING_WAIT,
+
+  /**
+   * The `YieldingWaitStrategy` is one of two WaitStrategies that can be use 
in low-latency systems.
+   * It is designed for cases where there is the option to burn CPU cycles 
with the goal of improving latency.
+   * The `YieldingWaitStrategy` will busy spin, waiting for the sequence to 
increment to the appropriate value.
+   * Inside the body of the loop `Thread#yield()` will be called allowing 
other queued threads to run.
+   * This is the recommended wait strategy when you need very high 
performance, and the number of `EventHandler` threads is lower than the total 
number of logical cores,
+   * e.g. you have hyper-threading enabled.
+   */
+  YIELDING_WAIT,
+
+  /**
+   * The `BusySpinWaitStrategy` is the highest performing WaitStrategy.
+   * Like the `YieldingWaitStrategy`, it can be used in low-latency systems, 
but puts the highest constraints on the deployment environment.
+   */
+  BUSY_SPIN_WAIT;
+
+  public static List<String> getNames() {
+    List<String> names = new ArrayList<>(KeyGeneratorType.values().length);
+    Arrays.stream(KeyGeneratorType.values())
+        .forEach(x -> names.add(x.name()));
+    return names;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueConsumer.java
 b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java
similarity index 50%
copy from 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueConsumer.java
copy to 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java
index c34842fbe3..05ecb1746c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueConsumer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java
@@ -18,44 +18,32 @@
 
 package org.apache.hudi.common.util.queue;
 
-import java.util.Iterator;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 /**
- * Consume entries from queue and execute callback function.
+ * Types of {@link org.apache.hudi.common.util.queue.HoodieExecutor}.
  */
-public abstract class BoundedInMemoryQueueConsumer<I, O> {
-
-  /**
-   * API to de-queue entries to memory bounded queue.
-   *
-   * @param queue In Memory bounded queue
-   */
-  public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
-    Iterator<I> iterator = queue.iterator();
-
-    while (iterator.hasNext()) {
-      consumeOneRecord(iterator.next());
-    }
-
-    // Notifies done
-    finish();
-
-    return getResult();
-  }
-
-  /**
-   * Consumer One record.
-   */
-  protected abstract void consumeOneRecord(I record);
+public enum ExecutorType {
 
   /**
-   * Notifies implementation that we have exhausted consuming records from 
queue.
+   * Executor which orchestrates concurrent producers and consumers 
communicating through a bounded in-memory message queue using 
LinkedBlockingQueue.
    */
-  protected abstract void finish();
+  BOUNDED_IN_MEMORY,
 
   /**
-   * Return result of consuming records so far.
+   * Executor which orchestrates concurrent producers and consumers 
communicating through disruptor as a lock free message queue
+   * to gain better writing performance. Although DisruptorExecutor is still 
an experimental feature.
    */
-  protected abstract O getResult();
+  DISRUPTOR;
 
+  public static List<String> getNames() {
+    List<String> names = new ArrayList<>(ExecutorType.values().length);
+    Arrays.stream(KeyGeneratorType.values())
+        .forEach(x -> names.add(x.name()));
+    return names;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java
index 549683754c..df19915878 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java
@@ -28,18 +28,18 @@ import java.util.function.Function;
  *
  * @param <I> Type of entry produced for queue
  */
-public class FunctionBasedQueueProducer<I> implements 
BoundedInMemoryQueueProducer<I> {
+public class FunctionBasedQueueProducer<I> implements HoodieProducer<I> {
 
   private static final Logger LOG = 
LogManager.getLogger(FunctionBasedQueueProducer.class);
 
-  private final Function<BoundedInMemoryQueue<I, ?>, Boolean> producerFunction;
+  private final Function<HoodieMessageQueue<I, ?>, Boolean> producerFunction;
 
-  public FunctionBasedQueueProducer(Function<BoundedInMemoryQueue<I, ?>, 
Boolean> producerFunction) {
+  public FunctionBasedQueueProducer(Function<HoodieMessageQueue<I, ?>, 
Boolean> producerFunction) {
     this.producerFunction = producerFunction;
   }
 
   @Override
-  public void produce(BoundedInMemoryQueue<I, ?> queue) {
+  public void produce(HoodieMessageQueue<I, ?> queue) {
     LOG.info("starting function which will enqueue records");
     producerFunction.apply(queue);
     LOG.info("finished function which will enqueue records");
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieConsumer.java
similarity index 69%
copy from 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
copy to 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieConsumer.java
index ecea9f2193..fb67fab6c7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieConsumer.java
@@ -19,16 +19,12 @@
 package org.apache.hudi.common.util.queue;
 
 /**
- * Producer for {@link BoundedInMemoryQueue}. Memory Bounded Buffer supports 
multiple producers single consumer pattern.
- *
- * @param <I> Input type for buffer items produced
+ * HoodieConsumer is used to consume records/messages from hoodie inner 
message queue and write into DFS.
  */
-public interface BoundedInMemoryQueueProducer<I> {
+public interface HoodieConsumer<I, O> {
 
   /**
-   * API to enqueue entries to memory bounded queue.
-   *
-   * @param queue In Memory bounded queue
+   * Consume records from inner message queue.
    */
-  void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception;
+  O consume(HoodieMessageQueue<?, I> queue) throws Exception;
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java
similarity index 63%
copy from 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
copy to 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java
index ecea9f2193..7d51441ede 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java
@@ -18,17 +18,27 @@
 
 package org.apache.hudi.common.util.queue;
 
+import java.io.Closeable;
+
 /**
- * Producer for {@link BoundedInMemoryQueue}. Memory Bounded Buffer supports 
multiple producers single consumer pattern.
- *
- * @param <I> Input type for buffer items produced
+ * HoodieExecutor which orchestrates concurrent producers and consumers 
communicating through a bounded in message queue.
  */
-public interface BoundedInMemoryQueueProducer<I> {
+public interface HoodieExecutor<I, O, E> extends Closeable {
 
   /**
-   * API to enqueue entries to memory bounded queue.
-   *
-   * @param queue In Memory bounded queue
+   * Main API to
+   * 1. Set up and run all the production
+   * 2. Set up and run all the consumption.
+   * 3. Shutdown and return the result.
    */
-  void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception;
+  E execute();
+
+  boolean isRemaining();
+
+  /**
+   * Shutdown all the consumers and producers.
+   */
+  void shutdownNow();
+
+  boolean awaitTermination();
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutorBase.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutorBase.java
new file mode 100644
index 0000000000..8dc07e35e4
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutorBase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.common.util.CustomizedThreadFactory;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * HoodieExecutorBase holds common elements producerExecutorService, 
consumerExecutorService, producers and a single consumer.
+ * Also HoodieExecutorBase control the lifecycle of producerExecutorService 
and consumerExecutorService.
+ */
+public abstract class HoodieExecutorBase<I, O, E> implements HoodieExecutor<I, 
O, E> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieExecutorBase.class);
+
+  private static final long TERMINATE_WAITING_TIME_SECS = 60L;
+  // Executor service used for launching write thread.
+  protected final ExecutorService producerExecutorService;
+  // Executor service used for launching read thread.
+  protected final ExecutorService consumerExecutorService;
+  // Producers
+  protected final List<HoodieProducer<I>> producers;
+  // Consumer
+  protected final Option<IteratorBasedQueueConsumer<O, E>> consumer;
+  // pre-execute function to implement environment specific behavior before 
executors (producers/consumer) run
+  protected final Runnable preExecuteRunnable;
+
+  CompletableFuture<Void> producerFuture;
+
+  public HoodieExecutorBase(List<HoodieProducer<I>> producers, 
Option<IteratorBasedQueueConsumer<O, E>> consumer,
+                            Runnable preExecuteRunnable) {
+    this.producers = producers;
+    this.consumer = consumer;
+    this.preExecuteRunnable = preExecuteRunnable;
+    // Ensure fixed thread for each producer thread
+    this.producerExecutorService = 
Executors.newFixedThreadPool(producers.size(), new 
CustomizedThreadFactory("executor-queue-producer", preExecuteRunnable));
+    // Ensure single thread for consumer
+    this.consumerExecutorService = Executors.newSingleThreadExecutor(new 
CustomizedThreadFactory("executor-queue-consumer", preExecuteRunnable));
+  }
+
+  /**
+   * Start all Producers.
+   */
+  public abstract CompletableFuture<Void> startProducers();
+
+  /**
+   * Start consumer.
+   */
+  protected abstract CompletableFuture<E> startConsumer();
+
+  /**
+   * Closing/cleaning up the executor's resources after consuming finished.
+   */
+  protected abstract void postAction();
+
+  /**
+   * get bounded in message queue.
+   */
+  public abstract HoodieMessageQueue<I, O> getQueue();
+
+  /**
+   * set all the resources for current HoodieExecutor before start to produce 
and consume records.
+   */
+  protected abstract void setup();
+
+  @Override
+  public boolean awaitTermination() {
+    // if current thread has been interrupted before awaitTermination was 
called, we still give
+    // executor a chance to proceeding. So clear the interrupt flag and reset 
it if needed before return.
+    boolean interruptedBefore = Thread.interrupted();
+    boolean producerTerminated = false;
+    boolean consumerTerminated = false;
+    try {
+      producerTerminated = 
producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, 
TimeUnit.SECONDS);
+      consumerTerminated = 
consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, 
TimeUnit.SECONDS);
+    } catch (InterruptedException ie) {
+      // fail silently for any other interruption
+    }
+    // reset interrupt flag if needed
+    if (interruptedBefore) {
+      Thread.currentThread().interrupt();
+    }
+    return producerTerminated && consumerTerminated;
+  }
+
+  @Override
+  public void close() {
+    if (!producerExecutorService.isShutdown()) {
+      producerExecutorService.shutdown();
+    }
+    if (!consumerExecutorService.isShutdown()) {
+      consumerExecutorService.shutdown();
+    }
+  }
+
+  /**
+   * Main API to run both production and consumption.
+   */
+  @Override
+  public E execute() {
+    try {
+      ValidationUtils.checkState(this.consumer.isPresent());
+      setup();
+      producerFuture = startProducers();
+      CompletableFuture<E> future = startConsumer();
+      return future.get();
+    } catch (InterruptedException ie) {
+      shutdownNow();
+      Thread.currentThread().interrupt();
+      throw new HoodieException(ie);
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    } finally {
+      postAction();
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieIterableMessageQueue.java
similarity index 68%
copy from 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
copy to 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieIterableMessageQueue.java
index ecea9f2193..71ef39f2c1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieIterableMessageQueue.java
@@ -18,17 +18,12 @@
 
 package org.apache.hudi.common.util.queue;
 
+import java.util.Iterator;
+
 /**
- * Producer for {@link BoundedInMemoryQueue}. Memory Bounded Buffer supports 
multiple producers single consumer pattern.
- *
- * @param <I> Input type for buffer items produced
+ * IteratorBasedHoodieMessageQueue implements HoodieMessageQueue with Iterable
  */
-public interface BoundedInMemoryQueueProducer<I> {
+public abstract class HoodieIterableMessageQueue<I, O> implements 
HoodieMessageQueue<I, O>, Iterable<O> {
 
-  /**
-   * API to enqueue entries to memory bounded queue.
-   *
-   * @param queue In Memory bounded queue
-   */
-  void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception;
+  public abstract Iterator<O> iterator();
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java
similarity index 53%
copy from 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
copy to 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java
index ecea9f2193..ae226f8adb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java
@@ -18,17 +18,36 @@
 
 package org.apache.hudi.common.util.queue;
 
+import org.apache.hudi.common.util.Option;
+import java.io.Closeable;
+
 /**
- * Producer for {@link BoundedInMemoryQueue}. Memory Bounded Buffer supports 
multiple producers single consumer pattern.
- *
- * @param <I> Input type for buffer items produced
+ * HoodieMessageQueue holds an internal message queue, and control the 
behavior of
+ * 1. insert record into internal message queue.
+ * 2. get record from internal message queue.
+ * 3. close internal message queue.
  */
-public interface BoundedInMemoryQueueProducer<I> {
+public interface HoodieMessageQueue<I, O> extends Closeable {
+
+  /**
+   * Returns the number of elements in this queue.
+   */
+  long size();
+
+  /**
+   * Insert a record into inner message queue.
+   */
+  void insertRecord(I t) throws Exception;
+
+  /**
+   * Read records from inner message queue.
+   */
+  Option<O> readNextRecord();
 
   /**
-   * API to enqueue entries to memory bounded queue.
-   *
-   * @param queue In Memory bounded queue
+   * API to allow producers and consumer to communicate termination due to 
failure.
    */
-  void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception;
+  void markAsFailed(Throwable e);
+
+  boolean isEmpty();
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieProducer.java
similarity index 76%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieProducer.java
index ecea9f2193..f56dd4cce9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieProducer.java
@@ -19,16 +19,16 @@
 package org.apache.hudi.common.util.queue;
 
 /**
- * Producer for {@link BoundedInMemoryQueue}. Memory Bounded Buffer supports 
multiple producers single consumer pattern.
+ * Producer for {@link HoodieMessageQueue}. Memory Bounded Buffer supports 
multiple producers single consumer pattern.
  *
  * @param <I> Input type for buffer items produced
  */
-public interface BoundedInMemoryQueueProducer<I> {
+public interface HoodieProducer<I> {
 
   /**
-   * API to enqueue entries to memory bounded queue.
+   * API to enqueue entries to bounded queue.
    *
    * @param queue In Memory bounded queue
    */
-  void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception;
+  void produce(HoodieMessageQueue<I, ?> queue) throws Exception;
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueConsumer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueConsumer.java
similarity index 81%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueConsumer.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueConsumer.java
index c34842fbe3..713d650464 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueConsumer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueConsumer.java
@@ -23,15 +23,17 @@ import java.util.Iterator;
 /**
  * Consume entries from queue and execute callback function.
  */
-public abstract class BoundedInMemoryQueueConsumer<I, O> {
+public abstract class IteratorBasedQueueConsumer<I, O> implements 
HoodieConsumer<I, O> {
 
   /**
    * API to de-queue entries to memory bounded queue.
    *
    * @param queue In Memory bounded queue
    */
-  public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
-    Iterator<I> iterator = queue.iterator();
+  @Override
+  public O consume(HoodieMessageQueue<?, I> queue) throws Exception {
+
+    Iterator<I> iterator = ((HoodieIterableMessageQueue) queue).iterator();
 
     while (iterator.hasNext()) {
       consumeOneRecord(iterator.next());
@@ -46,12 +48,12 @@ public abstract class BoundedInMemoryQueueConsumer<I, O> {
   /**
    * Consumer One record.
    */
-  protected abstract void consumeOneRecord(I record);
+  public abstract void consumeOneRecord(I record);
 
   /**
    * Notifies implementation that we have exhausted consuming records from 
queue.
    */
-  protected abstract void finish();
+  public void finish(){}
 
   /**
    * Return result of consuming records so far.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java
index 3d11f38e5c..7904fd61eb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java
@@ -28,7 +28,7 @@ import java.util.Iterator;
  *
  * @param <I> Item type produced for the buffer.
  */
-public class IteratorBasedQueueProducer<I> implements 
BoundedInMemoryQueueProducer<I> {
+public class IteratorBasedQueueProducer<I> implements HoodieProducer<I> {
 
   private static final Logger LOG = 
LogManager.getLogger(IteratorBasedQueueProducer.class);
 
@@ -40,7 +40,7 @@ public class IteratorBasedQueueProducer<I> implements 
BoundedInMemoryQueueProduc
   }
 
   @Override
-  public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
+  public void produce(HoodieMessageQueue<I, ?> queue) throws Exception {
     LOG.info("starting to buffer records");
     while (inputIterator.hasNext()) {
       queue.insertRecord(inputIterator.next());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java
new file mode 100644
index 0000000000..8137d2a136
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import static 
org.apache.hudi.common.util.queue.DisruptorWaitStrategyType.BLOCKING_WAIT;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.BusySpinWaitStrategy;
+import com.lmax.disruptor.SleepingWaitStrategy;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.YieldingWaitStrategy;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+public class WaitStrategyFactory {
+
+  public static final String DEFAULT_STRATEGY = BLOCKING_WAIT.name();
+
+  /**
+   * Build WaitStrategy for disruptor
+   */
+  public static WaitStrategy build(Option<String> name) {
+    DisruptorWaitStrategyType strategyType = name.isPresent() ? 
DisruptorWaitStrategyType.valueOf(name.get().toUpperCase()) : BLOCKING_WAIT;
+
+    switch (strategyType) {
+      case BLOCKING_WAIT:
+        return new BlockingWaitStrategy();
+      case SLEEPING_WAIT:
+        return new SleepingWaitStrategy();
+      case YIELDING_WAIT:
+        return new YieldingWaitStrategy();
+      case BUSY_SPIN_WAIT:
+        return new BusySpinWaitStrategy();
+      default:
+        throw new HoodieException("Unsupported Executor Type " + name);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index f44ec67e5e..2f70a10077 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
 import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -229,8 +229,8 @@ public class FormatUtils {
     /**
      * Setup log and parquet reading in parallel. Both write to central buffer.
      */
-    private List<BoundedInMemoryQueueProducer<HoodieRecord<?>>> 
getParallelProducers() {
-      List<BoundedInMemoryQueueProducer<HoodieRecord<?>>> producers = new 
ArrayList<>();
+    private List<HoodieProducer<HoodieRecord<?>>> getParallelProducers() {
+      List<HoodieProducer<HoodieRecord<?>>> producers = new ArrayList<>();
       producers.add(new FunctionBasedQueueProducer<>(buffer -> {
         scanner.scan();
         return null;
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index 84c8088650..700a87cbb7 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -24,8 +24,8 @@ import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
 import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
 import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
 import org.apache.hudi.hadoop.RecordReaderValueIterator;
 import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper;
@@ -104,8 +104,8 @@ class RealtimeUnmergedRecordReader extends 
AbstractRealtimeRecordReader
   /**
    * Setup log and parquet reading in parallel. Both write to central buffer.
    */
-  private List<BoundedInMemoryQueueProducer<ArrayWritable>> 
getParallelProducers() {
-    List<BoundedInMemoryQueueProducer<ArrayWritable>> producers = new 
ArrayList<>();
+  private List<HoodieProducer<ArrayWritable>> getParallelProducers() {
+    List<HoodieProducer<ArrayWritable>> producers = new ArrayList<>();
     producers.add(new FunctionBasedQueueProducer<>(buffer -> {
       logRecordScanner.scan();
       return null;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala
new file mode 100644
index 0000000000..b1d2517374
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieSparkUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.hudi.benchmark.{HoodieBenchmark, HoodieBenchmarkBase}
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, RowFactory, SaveMode, SparkSession}
+
+import scala.util.Random
+
+object BoundInMemoryExecutorBenchmark extends HoodieBenchmarkBase {
+
+  protected val spark: SparkSession = getSparkSession
+
+  val recordNumber = 1000000
+
+  def getSparkSession: SparkSession = SparkSession.builder()
+    .master("local[*]")
+    .appName(this.getClass.getCanonicalName)
+    .withExtensions(new HoodieSparkSessionExtension)
+    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+    .config("spark.sql.session.timeZone", "CTT")
+    .config(sparkConf())
+    .getOrCreate()
+
+  def sparkConf(): SparkConf = {
+    val sparkConf = new SparkConf()
+    if (HoodieSparkUtils.gteqSpark3_2) {
+      sparkConf.set("spark.sql.catalog.spark_catalog",
+        "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+    }
+    sparkConf
+  }
+
+  private def createDataFrame(number: Int): DataFrame = {
+    val schema = new StructType()
+      .add("c1", IntegerType)
+      .add("c2", StringType)
+
+    val rdd = spark.sparkContext.parallelize(0 to number, 2).map { item =>
+      val c1 = Integer.valueOf(item)
+      val c2 = s"abc"
+      RowFactory.create(c1, c2)
+    }
+    spark.createDataFrame(rdd, schema)
+  }
+
+  /**
+   * OpenJDK 64-Bit Server VM 1.8.0_161-b14 on Linux 3.10.0-693.21.1.el7.x86_64
+   * Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
+   * COW Ingestion:                            Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+   * 
------------------------------------------------------------------------------------------------------------------------
+   * BoundInMemory Executor                             5629           5765    
     192          0.2        5628.9       1.0X
+   * Disruptor Executor                                 2772           2862    
     127          0.4        2772.2       2.0X
+   *
+   */
+  private def cowTableDisruptorExecutorBenchmark(tableName: String = 
"executorBenchmark"): Unit = {
+    val df = createDataFrame(recordNumber)
+    withTempDir {f =>
+      val benchmark = new HoodieBenchmark("COW Ingestion", recordNumber)
+      benchmark.addCase("BoundInMemory Executor") { _ =>
+        val finalTableName = tableName + Random.nextInt(10000)
+        df.write.format("hudi")
+          .mode(SaveMode.Overwrite)
+          .option("hoodie.datasource.write.recordkey.field", "c1")
+          .option("hoodie.datasource.write.partitionpath.field", "c2")
+          .option("hoodie.table.name", finalTableName)
+          .option("hoodie.metadata.enable", "false")
+          .option("hoodie.clean.automatic", "false")
+          .option("hoodie.bulkinsert.sort.mode", "NONE")
+          .option("hoodie.insert.shuffle.parallelism", "2")
+          .option("hoodie.datasource.write.operation", "bulk_insert")
+          .option("hoodie.datasource.write.row.writer.enable", "false")
+          .option("hoodie.bulkinsert.shuffle.parallelism", "1")
+          .option("hoodie.upsert.shuffle.parallelism", "2")
+          .option("hoodie.delete.shuffle.parallelism", "2")
+          .option("hoodie.populate.meta.fields", "false")
+          .option("hoodie.table.keygenerator.class", 
"org.apache.hudi.keygen.SimpleKeyGenerator")
+          .save(new Path(f.getCanonicalPath, finalTableName).toUri.toString)
+      }
+
+      benchmark.addCase("Disruptor Executor") { _ =>
+        val finalTableName = tableName + Random.nextInt(10000)
+        df.write.format("hudi")
+          .mode(SaveMode.Overwrite)
+          .option("hoodie.datasource.write.recordkey.field", "c1")
+          .option("hoodie.datasource.write.partitionpath.field", "c2")
+          .option("hoodie.table.name", finalTableName)
+          .option("hoodie.metadata.enable", "false")
+          .option("hoodie.clean.automatic", "false")
+          .option("hoodie.bulkinsert.sort.mode", "NONE")
+          .option("hoodie.insert.shuffle.parallelism", "2")
+          .option("hoodie.datasource.write.operation", "bulk_insert")
+          .option("hoodie.datasource.write.row.writer.enable", "false")
+          .option("hoodie.bulkinsert.shuffle.parallelism", "1")
+          .option("hoodie.upsert.shuffle.parallelism", "2")
+          .option("hoodie.delete.shuffle.parallelism", "2")
+          .option("hoodie.write.executor.type", "DISRUPTOR")
+          .option("hoodie.populate.meta.fields", "false")
+          .option("hoodie.table.keygenerator.class", 
"org.apache.hudi.keygen.SimpleKeyGenerator")
+
+          .save(new Path(f.getCanonicalPath, finalTableName).toUri.toString)
+      }
+      benchmark.run()
+    }
+  }
+
+  override def afterAll(): Unit = {
+    spark.stop()
+  }
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    cowTableDisruptorExecutorBenchmark()
+  }
+}
diff --git a/pom.xml b/pom.xml
index e97647b119..3b02c916f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,6 +196,7 @@
     
<trino.bundle.bootstrap.shade.prefix>org.apache.hudi.</trino.bundle.bootstrap.shade.prefix>
     <shadeSources>true</shadeSources>
     <zk-curator.version>2.7.1</zk-curator.version>
+    <disruptor.version>3.4.2</disruptor.version>
     <antlr.version>4.7</antlr.version>
     <aws.sdk.version>1.12.22</aws.sdk.version>
     <proto.version>3.21.5</proto.version>

Reply via email to