zhangyue19921010 commented on PR #7582:
URL: https://github.com/apache/hudi/pull/7582#issuecomment-1367801102
For example something like this
```
public class DisruptorExecutor<I, O, E> extends
BaseHoodieQueueBasedExecutor<I, O, E> {
public DisruptorExecutor(final Option<Integer> bufferSize, final
Iterator<I> inputItr,
HoodieConsumer<O, E> consumer, Function<I, O>
transformFunction, Option<String> waitStrategy, Runnable preExecuteRunnable) {
this(bufferSize, Collections.singletonList(new
IteratorBasedQueueProducer<>(inputItr)), Option.of(consumer),
transformFunction, waitStrategy, preExecuteRunnable);
}
public DisruptorExecutor(final Option<Integer> bufferSize,
List<HoodieProducer<I>> producers,
Option<HoodieConsumer<O, E>> consumer, final
Function<I, O> transformFunction,
final Option<String> waitStrategy, Runnable
preExecuteRunnable) {
super(producers, consumer, new DisruptorMessageQueue<>(bufferSize,
transformFunction, waitStrategy, producers.size(), preExecuteRunnable),
preExecuteRunnable);
}
@Override
protected void doConsume(HoodieMessageQueue<I, O> queue, HoodieConsumer<O,
E> consumer) {
// no-op
// will do consuming actions in disruptor
}
@Override
public void setup() {
DisruptorMessageQueue<I, O> disruptorQueue = (DisruptorMessageQueue<I,
O>) queue;
// Before we start producing, we need to set up Disruptor's queue
disruptorQueue.setHandlers(consumer.get());
disruptorQueue.start();
}
}
```
and
```
public void setup(){}
/**
* Main API to run both production and consumption.
*/
@Override
public E execute() {
try {
checkState(this.consumer.isPresent());
setup();
// Start consuming/producing asynchronously
CompletableFuture<Void> consuming = startConsumingAsync();
CompletableFuture<Void> producing = startProducingAsync();
// NOTE: To properly support mode when there's no consumer, we have to
fall back
// to producing future as the trigger for us to shut down the
queue
return producing.thenCombine(consuming, (aVoid, anotherVoid) -> null)
.whenComplete((ignored, throwable) -> {
// Close the queue to release the resources
queue.close();
})
.thenApply(ignored -> consumer.get().finish())
// Block until producing and consuming both finish
.get();
} catch (Exception e) {
if (e instanceof InterruptedException) {
// In case {@code InterruptedException} was thrown, resetting the
interrupted flag
// of the thread, we reset it (to true) again to permit subsequent
handlers
// to be interrupted as well
Thread.currentThread().interrupt();
}
throw new HoodieException(e);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]