zhangyue19921010 commented on code in PR #5416:
URL: https://github.com/apache/hudi/pull/5416#discussion_r1009223711
##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java:
##########
@@ -18,117 +18,97 @@
package org.apache.hudi.common.util.queue;
-import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import java.util.stream.Collectors;
/**
- * Executor which orchestrates concurrent producers and consumers
communicating through a bounded in-memory queue. This
+ * Executor which orchestrates concurrent producers and consumers
communicating through 'BoundedInMemoryQueue'. This
* class takes as input the size limit, queue producer(s), consumer and
transformer and exposes API to orchestrate
* concurrent execution of these actors communicating through a central
bounded queue
*/
-public class BoundedInMemoryExecutor<I, O, E> {
+public class BoundedInMemoryExecutor<I, O, E> extends HoodieExecutorBase<I, O,
E> {
private static final Logger LOG =
LogManager.getLogger(BoundedInMemoryExecutor.class);
- private static final long TERMINATE_WAITING_TIME_SECS = 60L;
- // Executor service used for launching write thread.
- private final ExecutorService producerExecutorService;
- // Executor service used for launching read thread.
- private final ExecutorService consumerExecutorService;
- // Used for buffering records which is controlled by
HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
- private final BoundedInMemoryQueue<I, O> queue;
- // Producers
- private final List<BoundedInMemoryQueueProducer<I>> producers;
- // Consumer
- private final Option<BoundedInMemoryQueueConsumer<O, E>> consumer;
- // pre-execute function to implement environment specific behavior before
executors (producers/consumer) run
- private final Runnable preExecuteRunnable;
+ private final HoodieMessageQueue<I, O> queue;
public BoundedInMemoryExecutor(final long bufferLimitInBytes, final
Iterator<I> inputItr,
- BoundedInMemoryQueueConsumer<O, E> consumer,
Function<I, O> transformFunction, Runnable preExecuteRunnable) {
+ IteratorBasedQueueConsumer<O, E> consumer,
Function<I, O> transformFunction, Runnable preExecuteRunnable) {
this(bufferLimitInBytes, new IteratorBasedQueueProducer<>(inputItr),
Option.of(consumer), transformFunction, preExecuteRunnable);
}
- public BoundedInMemoryExecutor(final long bufferLimitInBytes,
BoundedInMemoryQueueProducer<I> producer,
- Option<BoundedInMemoryQueueConsumer<O, E>>
consumer, final Function<I, O> transformFunction) {
+ public BoundedInMemoryExecutor(final long bufferLimitInBytes,
HoodieProducer<I> producer,
+ Option<IteratorBasedQueueConsumer<O, E>>
consumer, final Function<I, O> transformFunction) {
this(bufferLimitInBytes, producer, consumer, transformFunction,
Functions.noop());
}
- public BoundedInMemoryExecutor(final long bufferLimitInBytes,
BoundedInMemoryQueueProducer<I> producer,
- Option<BoundedInMemoryQueueConsumer<O, E>>
consumer, final Function<I, O> transformFunction, Runnable preExecuteRunnable) {
+ public BoundedInMemoryExecutor(final long bufferLimitInBytes,
HoodieProducer<I> producer,
+ Option<IteratorBasedQueueConsumer<O, E>>
consumer, final Function<I, O> transformFunction, Runnable preExecuteRunnable) {
this(bufferLimitInBytes, Collections.singletonList(producer), consumer,
transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable);
}
- public BoundedInMemoryExecutor(final long bufferLimitInBytes,
List<BoundedInMemoryQueueProducer<I>> producers,
- Option<BoundedInMemoryQueueConsumer<O, E>>
consumer, final Function<I, O> transformFunction,
+ public BoundedInMemoryExecutor(final long bufferLimitInBytes,
List<HoodieProducer<I>> producers,
+ Option<IteratorBasedQueueConsumer<O, E>>
consumer, final Function<I, O> transformFunction,
final SizeEstimator<O> sizeEstimator,
Runnable preExecuteRunnable) {
- this.producers = producers;
- this.consumer = consumer;
- this.preExecuteRunnable = preExecuteRunnable;
- // Ensure fixed thread for each producer thread
- this.producerExecutorService =
Executors.newFixedThreadPool(producers.size(), new
CustomizedThreadFactory("producer"));
- // Ensure single thread for consumer
- this.consumerExecutorService = Executors.newSingleThreadExecutor(new
CustomizedThreadFactory("consumer"));
- this.queue = new BoundedInMemoryQueue<>(bufferLimitInBytes,
transformFunction, sizeEstimator);
+ super(producers, consumer, preExecuteRunnable);
+ this.queue = new BoundedInMemoryQueueIterable<>(bufferLimitInBytes,
transformFunction, sizeEstimator);
}
/**
- * Start all Producers.
+ * Start all producers at once.
*/
- public ExecutorCompletionService<Boolean> startProducers() {
+ @Override
+ public CompletableFuture<Void> startProducers() {
// Latch to control when and which producer thread will close the queue
final CountDownLatch latch = new CountDownLatch(producers.size());
- final ExecutorCompletionService<Boolean> completionService =
- new ExecutorCompletionService<Boolean>(producerExecutorService);
- producers.stream().map(producer -> {
- return completionService.submit(() -> {
+
+ return CompletableFuture.allOf(producers.stream().map(producer -> {
+ return CompletableFuture.supplyAsync(() -> {
try {
- preExecuteRunnable.run();
Review Comment:
Actually, we need to do preExecuteRunnable in Producer/Consumer thread of
BoundedInMemory/DisruptorQueue
The original implementation is indeed a bit confusing, and it is simplified
here.
expand the CustomizedThreadFactory and will do preExecuteRunnable in each
thread if it wasn't null.
Also use this CustomizedThreadFactory to build producerExecutorService and
consumerExecutorService
So that we don't need to call preExecuteRunnable manually.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]