nsivabalan commented on code in PR #5416:
URL: https://github.com/apache/hudi/pull/5416#discussion_r1008719537


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -233,6 +244,16 @@ public class HoodieWriteConfig extends HoodieConfig {
       .defaultValue(String.valueOf(4 * 1024 * 1024))
       .withDocumentation("Size of in-memory buffer used for parallelizing 
network reads and lake storage writes.");
 
+  public static final ConfigProperty<Integer> WRITE_BUFFER_SIZE = 
ConfigProperty
+      .key("hoodie.write.executor.disruptor.buffer.size")
+      .defaultValue(1024)
+      .withDocumentation("The size of the Disruptor Executor ring buffer, must 
be power of 2");
+
+  public static final ConfigProperty<String> WRITE_WAIT_STRATEGY = 
ConfigProperty
+      .key("hoodie.write.executor.disruptor.wait.strategy")
+      .defaultValue("BLOCKING_WAIT")
+      .withDocumentation("Strategy employed for making Disruptor Executor wait 
on a cursor.");

Review Comment:
   can we add other possible values in the documentation.



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Executor which orchestrates concurrent producers and consumers 
communicating through 'DisruptorMessageQueue'. This
+ * class takes as queue producer(s), consumer and transformer and exposes API 
to orchestrate
+ * concurrent execution of these actors communicating through disruptor
+ */
+public class DisruptorExecutor<I, O, E> extends HoodieExecutorBase<I, O, E> {
+
+  private static final Logger LOG = 
LogManager.getLogger(DisruptorExecutor.class);
+  private final HoodieMessageQueue<I, O> queue;
+
+  public DisruptorExecutor(final Option<Integer> bufferSize, final Iterator<I> 
inputItr,
+                           IteratorBasedQueueConsumer<O, E> consumer, 
Function<I, O> transformFunction, Option<String> waitStrategy, Runnable 
preExecuteRunnable) {
+    this(bufferSize, new IteratorBasedQueueProducer<>(inputItr), 
Option.of(consumer), transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final Option<Integer> bufferSize, HoodieProducer<I> 
producer,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, 
final Function<I, O> transformFunction) {
+    this(bufferSize, producer, consumer, transformFunction, 
Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), Functions.noop());
+  }
+
+  public DisruptorExecutor(final Option<Integer> bufferSize, HoodieProducer<I> 
producer,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, 
final Function<I, O> transformFunction, Option<String> waitStrategy, Runnable 
preExecuteRunnable) {
+    this(bufferSize, Collections.singletonList(producer), consumer, 
transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final Option<Integer> bufferSize, 
List<HoodieProducer<I>> producers,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, 
final Function<I, O> transformFunction,
+                           final Option<String> waitStrategy, Runnable 
preExecuteRunnable) {
+    super(producers, consumer, preExecuteRunnable);
+    this.queue = new DisruptorMessageQueue<>(bufferSize, transformFunction, 
waitStrategy, producers.size(), preExecuteRunnable);
+  }
+
+  /**
+   * Start all Producers.
+   */
+  @Override
+  public CompletableFuture<Void> startProducers() {
+    return CompletableFuture.allOf(producers.stream().map(producer -> {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          producer.produce(queue);
+        } catch (Throwable e) {
+          LOG.error("error producing records", e);
+          throw new HoodieException("Error producing records in disruptor 
executor", e);
+        }
+        return true;
+      }, producerExecutorService);
+    }).toArray(CompletableFuture[]::new));
+  }
+
+  @Override
+  protected void setup() {
+    ((DisruptorMessageQueue)queue).setHandlers(consumer.get());
+    ((DisruptorMessageQueue)queue).start();
+  }
+
+  @Override
+  protected void postAction() {
+    try {
+      queue.close();

Review Comment:
   incase of shutdownNow, we shutdown the executorservices and then do queue. 
close. 
   where as here, we close queue first followed by shutting down executor 
services. 
   may I know whats the right approach. can we fix align. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -1040,6 +1065,14 @@ public int getWriteBufferLimitBytes() {
     return 
Integer.parseInt(getStringOrDefault(WRITE_BUFFER_LIMIT_BYTES_VALUE));
   }
 
+  public Option<String> getWriteExecutorWaitStrategy() {
+    return Option.of(getString(WRITE_WAIT_STRATEGY));
+  }
+
+  public Option<Integer> getWriteBufferSize() {

Review Comment:
   this is confusing w/ already existing one (WRITE_BUFFER_LIMIT_BYTES_VALUE). 
Can we add units here. initially I thought this is also in bytes. but looks 
like its ring buffer size. We can also add "ring buffer" or "disruptor" to the 
name since this is applicable only to disruptor based queue. 



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java:
##########
@@ -138,61 +118,41 @@ private Future<E> startConsumer() {
         } catch (Exception e) {
           LOG.error("error consuming records", e);
           queue.markAsFailed(e);
-          throw e;
+          throw new HoodieException(e);
         }
-      });
+      }, consumerExecutorService);
     }).orElse(CompletableFuture.completedFuture(null));
   }
 
-  /**
-   * Main API to run both production and consumption.
-   */
-  public E execute() {
-    try {
-      startProducers();
-      Future<E> future = startConsumer();
-      // Wait for consumer to be done
-      return future.get();
-    } catch (InterruptedException ie) {
-      shutdownNow();
-      Thread.currentThread().interrupt();
-      throw new HoodieException(ie);
-    } catch (Exception e) {
-      throw new HoodieException(e);
-    }
+  @Override
+  public boolean isRemaining() {
+    return ((BoundedInMemoryQueueIterable)queue).iterator().hasNext();

Review Comment:
   minor. we could use getQueue() and avoid casting here. 



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Executor which orchestrates concurrent producers and consumers 
communicating through 'DisruptorMessageQueue'. This
+ * class takes as queue producer(s), consumer and transformer and exposes API 
to orchestrate
+ * concurrent execution of these actors communicating through disruptor
+ */
+public class DisruptorExecutor<I, O, E> extends HoodieExecutorBase<I, O, E> {
+
+  private static final Logger LOG = 
LogManager.getLogger(DisruptorExecutor.class);
+  private final HoodieMessageQueue<I, O> queue;
+
+  public DisruptorExecutor(final Option<Integer> bufferSize, final Iterator<I> 
inputItr,
+                           IteratorBasedQueueConsumer<O, E> consumer, 
Function<I, O> transformFunction, Option<String> waitStrategy, Runnable 
preExecuteRunnable) {
+    this(bufferSize, new IteratorBasedQueueProducer<>(inputItr), 
Option.of(consumer), transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final Option<Integer> bufferSize, HoodieProducer<I> 
producer,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, 
final Function<I, O> transformFunction) {
+    this(bufferSize, producer, consumer, transformFunction, 
Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), Functions.noop());
+  }
+
+  public DisruptorExecutor(final Option<Integer> bufferSize, HoodieProducer<I> 
producer,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, 
final Function<I, O> transformFunction, Option<String> waitStrategy, Runnable 
preExecuteRunnable) {
+    this(bufferSize, Collections.singletonList(producer), consumer, 
transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final Option<Integer> bufferSize, 
List<HoodieProducer<I>> producers,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, 
final Function<I, O> transformFunction,
+                           final Option<String> waitStrategy, Runnable 
preExecuteRunnable) {
+    super(producers, consumer, preExecuteRunnable);
+    this.queue = new DisruptorMessageQueue<>(bufferSize, transformFunction, 
waitStrategy, producers.size(), preExecuteRunnable);
+  }
+
+  /**
+   * Start all Producers.
+   */
+  @Override
+  public CompletableFuture<Void> startProducers() {
+    return CompletableFuture.allOf(producers.stream().map(producer -> {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          producer.produce(queue);
+        } catch (Throwable e) {
+          LOG.error("error producing records", e);
+          throw new HoodieException("Error producing records in disruptor 
executor", e);
+        }
+        return true;
+      }, producerExecutorService);
+    }).toArray(CompletableFuture[]::new));
+  }
+
+  @Override
+  protected void setup() {
+    ((DisruptorMessageQueue)queue).setHandlers(consumer.get());
+    ((DisruptorMessageQueue)queue).start();
+  }
+
+  @Override
+  protected void postAction() {
+    try {
+      queue.close();
+      super.close();
+    } catch (IOException e) {
+      throw new HoodieIOException("Catch IOException while closing 
DisruptorMessageQueue", e);
+    }
+  }
+
+  @Override
+  protected CompletableFuture<E> startConsumer() {
+    return producerFuture.thenApplyAsync(res -> {
+      try {
+        queue.close();

Review Comment:
   I see we are calling queue.close() from many places. can you confirm that 
its idempotent and will not cause any exceptions. 



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Types of {@link org.apache.hudi.common.util.queue.HoodieExecutor}.
+ */
+public enum ExecutorType {
+
+  /**
+   * Executor which orchestrates concurrent producers and consumers 
communicating through a bounded in-memory message queue using 
LinkedBlockingQueue.
+   */
+  BOUNDED_IN_MEMORY,
+
+  /**
+   * Executor which orchestrates concurrent producers and consumers 
communicating through disruptor as a lock free message queue
+   * to gain better writing performance. Although DisruptorExecutor is still 
an experimental feature.
+   */
+  DISRUPTOR;
+
+  public static List<String> getNames() {
+    List<String> names = new ArrayList<>(KeyGeneratorType.values().length);

Review Comment:
   isn't this ExecutorType.values() ? 



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutorBase.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.common.util.CustomizedThreadFactory;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * HoodieExecutorBase holds common elements producerExecutorService, 
consumerExecutorService, producers and a single consumer.
+ * Also HoodieExecutorBase control the lifecycle of producerExecutorService 
and consumerExecutorService.
+ */
+public abstract class HoodieExecutorBase<I, O, E> implements HoodieExecutor<I, 
O, E> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieExecutorBase.class);
+
+  private static final long TERMINATE_WAITING_TIME_SECS = 60L;
+  // Executor service used for launching write thread.
+  public final ExecutorService producerExecutorService;

Review Comment:
   can we try to use protected wherever applicable. do we really need public 
for all these instance variables ?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static 
org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.queue.DisruptorMessageQueue;
+import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorMessageQueue extends HoodieClientTestHarness {
+
+  private final String instantTime = 
HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  // Test to ensure that we are reading all records from queue iterator in the 
same order
+  // without any exceptions.
+  @SuppressWarnings("unchecked")
+  @Test
+  @Timeout(value = 60)
+  public void testRecordReading() throws Exception {
+
+    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 100);
+    ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> beforeIndexedRecord = new ArrayList<>();
+    ArrayList<HoodieAvroRecord> afterRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> afterIndexedRecord = new ArrayList<>();
+
+    hoodieRecords.forEach(record -> {
+      final HoodieAvroRecord originalRecord = (HoodieAvroRecord) record;
+      beforeRecord.add(originalRecord);
+      try {
+        final Option<IndexedRecord> originalInsertValue =
+            
originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
+        beforeIndexedRecord.add(originalInsertValue.get());
+      } catch (IOException e) {
+        // ignore exception here.
+      }
+    });
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(Option.of(16));
+    
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =

Review Comment:
   not required in this patch. But do you think we can have one set of tests 
and parametrize two diff queues. looking to reuse code. Bcoz, the way we test 
both bounded in memory queue and disruptor based one are same. just the 
initialization differs. but essentially its a functional testing of the queue. 
   let me know wdyt.



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Executor which orchestrates concurrent producers and consumers 
communicating through 'DisruptorMessageQueue'. This
+ * class takes as queue producer(s), consumer and transformer and exposes API 
to orchestrate
+ * concurrent execution of these actors communicating through disruptor
+ */
+public class DisruptorExecutor<I, O, E> extends HoodieExecutorBase<I, O, E> {
+
+  private static final Logger LOG = 
LogManager.getLogger(DisruptorExecutor.class);
+  private final HoodieMessageQueue<I, O> queue;
+
+  public DisruptorExecutor(final Option<Integer> bufferSize, final Iterator<I> 
inputItr,
+                           IteratorBasedQueueConsumer<O, E> consumer, 
Function<I, O> transformFunction, Option<String> waitStrategy, Runnable 
preExecuteRunnable) {
+    this(bufferSize, new IteratorBasedQueueProducer<>(inputItr), 
Option.of(consumer), transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final Option<Integer> bufferSize, HoodieProducer<I> 
producer,

Review Comment:
   guess this constructor is not used. can we remove if not required



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.common.util.Option;
+import java.io.Closeable;
+
+/**
+ * HoodieMessageQueue holds an internal message queue, and control the 
behavior of
+ * 1. insert record into internal message queue.
+ * 2. get record from internal message queue.
+ * 3. close internal message queue.
+ */
+public interface HoodieMessageQueue<I, O> extends Closeable {
+
+  /**
+   * Get the size of inner message queue.
+   */
+  long size();

Review Comment:
   minor. does this refer to total size, or pending size. not apparent from the 
name.



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutorBase.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.common.util.CustomizedThreadFactory;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * HoodieExecutorBase holds common elements producerExecutorService, 
consumerExecutorService, producers and a single consumer.
+ * Also HoodieExecutorBase control the lifecycle of producerExecutorService 
and consumerExecutorService.
+ */
+public abstract class HoodieExecutorBase<I, O, E> implements HoodieExecutor<I, 
O, E> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieExecutorBase.class);
+
+  private static final long TERMINATE_WAITING_TIME_SECS = 60L;
+  // Executor service used for launching write thread.
+  public final ExecutorService producerExecutorService;
+  // Executor service used for launching read thread.
+  public final ExecutorService consumerExecutorService;
+  // Producers
+  public final List<HoodieProducer<I>> producers;
+  // Consumer
+  public final Option<IteratorBasedQueueConsumer<O, E>> consumer;
+  // pre-execute function to implement environment specific behavior before 
executors (producers/consumer) run
+  public final Runnable preExecuteRunnable;
+
+  public CompletableFuture<Void> producerFuture;
+
+  public HoodieExecutorBase(List<HoodieProducer<I>> producers, 
Option<IteratorBasedQueueConsumer<O, E>> consumer,
+                            Runnable preExecuteRunnable) {
+    this.producers = producers;
+    this.consumer = consumer;
+    this.preExecuteRunnable = preExecuteRunnable;
+    // Ensure fixed thread for each producer thread
+    this.producerExecutorService = 
Executors.newFixedThreadPool(producers.size(), new 
HoodieDaemonThreadFactory("executor-queue-producer", preExecuteRunnable));
+    // Ensure single thread for consumer
+    this.consumerExecutorService = Executors.newSingleThreadExecutor(new 
CustomizedThreadFactory("executor-queue-consumer"));
+  }
+
+  /**
+   * Start all Producers.
+   */
+  public abstract CompletableFuture<Void> startProducers();
+
+  /**
+   * Start consumer.
+   */
+  protected abstract CompletableFuture<E> startConsumer();
+
+  /**
+   * Closing/cleaning up the executor's resources after consuming finished.
+   */
+  protected abstract void postAction();
+
+  /**
+   * get bounded in message queue.
+   */
+  public abstract HoodieMessageQueue<I, O> getQueue();
+
+  /**
+   * set all the resources for current HoodieExecutor before start to produce 
and consume records.
+   */
+  protected abstract void setup();
+
+  public boolean awaitTermination() {
+    // if current thread has been interrupted before awaitTermination was 
called, we still give
+    // executor a chance to proceeding. So clear the interrupt flag and reset 
it if needed before return.
+    boolean interruptedBefore = Thread.interrupted();
+    boolean producerTerminated = false;
+    boolean consumerTerminated = false;
+    try {
+      producerTerminated = 
producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, 
TimeUnit.SECONDS);
+      consumerTerminated = 
consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, 
TimeUnit.SECONDS);
+    } catch (InterruptedException ie) {
+      // fail silently for any other interruption
+    }
+    // reset interrupt flag if needed
+    if (interruptedBefore) {
+      Thread.currentThread().interrupt();
+    }
+    return producerTerminated && consumerTerminated;
+  }
+
+  /**
+   * Shutdown all the consumers and producers.
+   */
+  public void shutdownNow() {

Review Comment:
   again, lets make protected or package private whereever applicbale. 
   and lets not make a method public just for testing purpose.



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.hudi.common.util.Option;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.function.Function;
+
+public class DisruptorMessageQueue<I, O> implements HoodieMessageQueue<I, O> {
+
+  private static final Logger LOG = 
LogManager.getLogger(DisruptorMessageQueue.class);
+
+  private final Disruptor<HoodieDisruptorEvent> queue;
+  private final Function<I, O> transformFunction;
+  private final RingBuffer<HoodieDisruptorEvent> ringBuffer;
+
+  public DisruptorMessageQueue(Option<Integer> bufferSize, Function<I, O> 
transformFunction, Option<String> waitStrategyName, int totalProducers, 
Runnable preExecuteRunnable) {
+    WaitStrategy waitStrategy = WaitStrategyFactory.build(waitStrategyName);
+    HoodieDaemonThreadFactory threadFactory = new 
HoodieDaemonThreadFactory(preExecuteRunnable);
+
+    this.queue = new Disruptor<>(new HoodieDisruptorEventFactory(), 
bufferSize.get(), threadFactory, totalProducers > 1 ? ProducerType.MULTI : 
ProducerType.SINGLE, waitStrategy);
+    this.ringBuffer = queue.getRingBuffer();
+    this.transformFunction = transformFunction;
+  }
+
+  @Override
+  public long size() {
+    return queue.getBufferSize();
+  }
+
+  @Override
+  public void insertRecord(I value) throws Exception {
+    O applied = transformFunction.apply(value);
+
+    EventTranslator<HoodieDisruptorEvent> translator = new 
EventTranslator<HoodieDisruptorEvent>() {
+      @Override
+      public void translateTo(HoodieDisruptorEvent event, long sequence) {
+        event.set(applied);
+      }
+    };
+
+    queue.getRingBuffer().publishEvent(translator);
+  }
+
+  @Override
+  public Option<O> readNextRecord() {
+    // Let DisruptorMessageHandler to handle consuming logic.
+    return null;
+  }
+
+  @Override
+  public void markAsFailed(Throwable e) {
+    // do nothing.
+  }
+
+  @Override
+  public void close() {
+    // Waits until all events currently in the disruptor have been processed 
by all event processors
+    queue.shutdown();
+  }
+
+  public boolean isEmpty() {
+    return ringBuffer.getBufferSize() == ringBuffer.remainingCapacity();
+  }
+
+  public void setHandlers(IteratorBasedQueueConsumer consumer) {

Review Comment:
   I don't think its a good practice to make methods public just for the 
purpose of testing. Can we please avoid that. 



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java:
##########
@@ -18,117 +18,97 @@
 
 package org.apache.hudi.common.util.queue;
 
-import org.apache.hudi.common.util.CustomizedThreadFactory;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SizeEstimator;
 import org.apache.hudi.exception.HoodieException;
 
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /**
- * Executor which orchestrates concurrent producers and consumers 
communicating through a bounded in-memory queue. This
+ * Executor which orchestrates concurrent producers and consumers 
communicating through 'BoundedInMemoryQueue'. This
  * class takes as input the size limit, queue producer(s), consumer and 
transformer and exposes API to orchestrate
  * concurrent execution of these actors communicating through a central 
bounded queue
  */
-public class BoundedInMemoryExecutor<I, O, E> {
+public class BoundedInMemoryExecutor<I, O, E> extends HoodieExecutorBase<I, O, 
E> {
 
   private static final Logger LOG = 
LogManager.getLogger(BoundedInMemoryExecutor.class);
-  private static final long TERMINATE_WAITING_TIME_SECS = 60L;
-  // Executor service used for launching write thread.
-  private final ExecutorService producerExecutorService;
-  // Executor service used for launching read thread.
-  private final ExecutorService consumerExecutorService;
-  // Used for buffering records which is controlled by 
HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
-  private final BoundedInMemoryQueue<I, O> queue;
-  // Producers
-  private final List<BoundedInMemoryQueueProducer<I>> producers;
-  // Consumer
-  private final Option<BoundedInMemoryQueueConsumer<O, E>> consumer;
-  // pre-execute function to implement environment specific behavior before 
executors (producers/consumer) run
-  private final Runnable preExecuteRunnable;
+  private final HoodieMessageQueue<I, O> queue;
 
   public BoundedInMemoryExecutor(final long bufferLimitInBytes, final 
Iterator<I> inputItr,
-                                 BoundedInMemoryQueueConsumer<O, E> consumer, 
Function<I, O> transformFunction, Runnable preExecuteRunnable) {
+                                 IteratorBasedQueueConsumer<O, E> consumer, 
Function<I, O> transformFunction, Runnable preExecuteRunnable) {
     this(bufferLimitInBytes, new IteratorBasedQueueProducer<>(inputItr), 
Option.of(consumer), transformFunction, preExecuteRunnable);
   }
 
-  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
BoundedInMemoryQueueProducer<I> producer,
-                                 Option<BoundedInMemoryQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction) {
+  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
HoodieProducer<I> producer,
+                                 Option<IteratorBasedQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction) {
     this(bufferLimitInBytes, producer, consumer, transformFunction, 
Functions.noop());
   }
 
-  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
BoundedInMemoryQueueProducer<I> producer,
-                                 Option<BoundedInMemoryQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction, Runnable preExecuteRunnable) {
+  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
HoodieProducer<I> producer,
+                                 Option<IteratorBasedQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction, Runnable preExecuteRunnable) {
     this(bufferLimitInBytes, Collections.singletonList(producer), consumer, 
transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable);
   }
 
-  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
List<BoundedInMemoryQueueProducer<I>> producers,
-                                 Option<BoundedInMemoryQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction,
+  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
List<HoodieProducer<I>> producers,
+                                 Option<IteratorBasedQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction,
                                  final SizeEstimator<O> sizeEstimator, 
Runnable preExecuteRunnable) {
-    this.producers = producers;
-    this.consumer = consumer;
-    this.preExecuteRunnable = preExecuteRunnable;
-    // Ensure fixed thread for each producer thread
-    this.producerExecutorService = 
Executors.newFixedThreadPool(producers.size(), new 
CustomizedThreadFactory("producer"));
-    // Ensure single thread for consumer
-    this.consumerExecutorService = Executors.newSingleThreadExecutor(new 
CustomizedThreadFactory("consumer"));
-    this.queue = new BoundedInMemoryQueue<>(bufferLimitInBytes, 
transformFunction, sizeEstimator);
+    super(producers, consumer, preExecuteRunnable);
+    this.queue = new BoundedInMemoryQueueIterable<>(bufferLimitInBytes, 
transformFunction, sizeEstimator);
   }
 
   /**
-   * Start all Producers.
+   * Start all producers at once.
    */
-  public ExecutorCompletionService<Boolean> startProducers() {
+  @Override
+  public CompletableFuture<Void> startProducers() {
     // Latch to control when and which producer thread will close the queue
     final CountDownLatch latch = new CountDownLatch(producers.size());
-    final ExecutorCompletionService<Boolean> completionService =
-        new ExecutorCompletionService<Boolean>(producerExecutorService);
-    producers.stream().map(producer -> {
-      return completionService.submit(() -> {
+
+    return CompletableFuture.allOf(producers.stream().map(producer -> {
+      return CompletableFuture.supplyAsync(() -> {
         try {
-          preExecuteRunnable.run();

Review Comment:
   previously preExecuteRunnable was executed  both in producer as well as 
consumers. W/ this patch, I see we are executing it only for consumers? is that 
intentional ? 
   also, I only see it getting executed in BoundedInMemory and not in 
Distruptor based impl. can you check on that please. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to