zhangyue19921010 commented on code in PR #5416:
URL: https://github.com/apache/hudi/pull/5416#discussion_r1009223711


##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java:
##########
@@ -18,117 +18,97 @@
 
 package org.apache.hudi.common.util.queue;
 
-import org.apache.hudi.common.util.CustomizedThreadFactory;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SizeEstimator;
 import org.apache.hudi.exception.HoodieException;
 
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /**
- * Executor which orchestrates concurrent producers and consumers 
communicating through a bounded in-memory queue. This
+ * Executor which orchestrates concurrent producers and consumers 
communicating through 'BoundedInMemoryQueue'. This
  * class takes as input the size limit, queue producer(s), consumer and 
transformer and exposes API to orchestrate
  * concurrent execution of these actors communicating through a central 
bounded queue
  */
-public class BoundedInMemoryExecutor<I, O, E> {
+public class BoundedInMemoryExecutor<I, O, E> extends HoodieExecutorBase<I, O, 
E> {
 
   private static final Logger LOG = 
LogManager.getLogger(BoundedInMemoryExecutor.class);
-  private static final long TERMINATE_WAITING_TIME_SECS = 60L;
-  // Executor service used for launching write thread.
-  private final ExecutorService producerExecutorService;
-  // Executor service used for launching read thread.
-  private final ExecutorService consumerExecutorService;
-  // Used for buffering records which is controlled by 
HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
-  private final BoundedInMemoryQueue<I, O> queue;
-  // Producers
-  private final List<BoundedInMemoryQueueProducer<I>> producers;
-  // Consumer
-  private final Option<BoundedInMemoryQueueConsumer<O, E>> consumer;
-  // pre-execute function to implement environment specific behavior before 
executors (producers/consumer) run
-  private final Runnable preExecuteRunnable;
+  private final HoodieMessageQueue<I, O> queue;
 
   public BoundedInMemoryExecutor(final long bufferLimitInBytes, final 
Iterator<I> inputItr,
-                                 BoundedInMemoryQueueConsumer<O, E> consumer, 
Function<I, O> transformFunction, Runnable preExecuteRunnable) {
+                                 IteratorBasedQueueConsumer<O, E> consumer, 
Function<I, O> transformFunction, Runnable preExecuteRunnable) {
     this(bufferLimitInBytes, new IteratorBasedQueueProducer<>(inputItr), 
Option.of(consumer), transformFunction, preExecuteRunnable);
   }
 
-  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
BoundedInMemoryQueueProducer<I> producer,
-                                 Option<BoundedInMemoryQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction) {
+  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
HoodieProducer<I> producer,
+                                 Option<IteratorBasedQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction) {
     this(bufferLimitInBytes, producer, consumer, transformFunction, 
Functions.noop());
   }
 
-  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
BoundedInMemoryQueueProducer<I> producer,
-                                 Option<BoundedInMemoryQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction, Runnable preExecuteRunnable) {
+  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
HoodieProducer<I> producer,
+                                 Option<IteratorBasedQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction, Runnable preExecuteRunnable) {
     this(bufferLimitInBytes, Collections.singletonList(producer), consumer, 
transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable);
   }
 
-  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
List<BoundedInMemoryQueueProducer<I>> producers,
-                                 Option<BoundedInMemoryQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction,
+  public BoundedInMemoryExecutor(final long bufferLimitInBytes, 
List<HoodieProducer<I>> producers,
+                                 Option<IteratorBasedQueueConsumer<O, E>> 
consumer, final Function<I, O> transformFunction,
                                  final SizeEstimator<O> sizeEstimator, 
Runnable preExecuteRunnable) {
-    this.producers = producers;
-    this.consumer = consumer;
-    this.preExecuteRunnable = preExecuteRunnable;
-    // Ensure fixed thread for each producer thread
-    this.producerExecutorService = 
Executors.newFixedThreadPool(producers.size(), new 
CustomizedThreadFactory("producer"));
-    // Ensure single thread for consumer
-    this.consumerExecutorService = Executors.newSingleThreadExecutor(new 
CustomizedThreadFactory("consumer"));
-    this.queue = new BoundedInMemoryQueue<>(bufferLimitInBytes, 
transformFunction, sizeEstimator);
+    super(producers, consumer, preExecuteRunnable);
+    this.queue = new BoundedInMemoryQueueIterable<>(bufferLimitInBytes, 
transformFunction, sizeEstimator);
   }
 
   /**
-   * Start all Producers.
+   * Start all producers at once.
    */
-  public ExecutorCompletionService<Boolean> startProducers() {
+  @Override
+  public CompletableFuture<Void> startProducers() {
     // Latch to control when and which producer thread will close the queue
     final CountDownLatch latch = new CountDownLatch(producers.size());
-    final ExecutorCompletionService<Boolean> completionService =
-        new ExecutorCompletionService<Boolean>(producerExecutorService);
-    producers.stream().map(producer -> {
-      return completionService.submit(() -> {
+
+    return CompletableFuture.allOf(producers.stream().map(producer -> {
+      return CompletableFuture.supplyAsync(() -> {
         try {
-          preExecuteRunnable.run();

Review Comment:
   Actually, we need to do preExecuteRunnable in Producer/Consumer thread of 
BoundedInMemory/DisruptorQueue
   
   The original implementation is indeed a bit confusing, and it is simplified 
here.
   
   expand the CustomizedThreadFactory and will do preExecuteRunnable in each 
thread if it wasn't null.
   
   Also use this CustomizedThreadFactory to build producerExecutorService and 
consumerExecutorService
   
   So that we don't need to call preExecuteRunnable manually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to