This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit af6b87c29b6b9d07444ed2c10bfc0f3b12810484 Author: Igal Shilman <[email protected]> AuthorDate: Thu Feb 20 22:14:28 2020 +0100 [FLINK-16063][core] Use BackpressureValve in AsyncSink --- .../statefun/flink/core/functions/AsyncSink.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java index 6ba6d51..aa8497b 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java @@ -23,17 +23,20 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ThreadLocalRandom; import org.apache.flink.api.common.state.MapState; +import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve; import org.apache.flink.statefun.flink.core.di.Inject; import org.apache.flink.statefun.flink.core.di.Label; import org.apache.flink.statefun.flink.core.di.Lazy; import org.apache.flink.statefun.flink.core.message.Message; import org.apache.flink.statefun.flink.core.queue.Locks; import org.apache.flink.statefun.flink.core.queue.MpscQueue; +import org.apache.flink.statefun.sdk.Address; final class AsyncSink { private final MapState<Long, Message> pendingAsyncOperations; private final Lazy<Reductions> reductions; private final Executor operatorMailbox; + private final BackPressureValve backPressureValve; private final MpscQueue<Message> completed = new MpscQueue<>(32768, Locks.jdkReentrantLock()); @@ -41,10 +44,12 @@ final class AsyncSink { AsyncSink( @Label("async-operations") MapState<Long, Message> pendingAsyncOperations, @Label("mailbox-executor") Executor operatorMailbox, - @Label("reductions") Lazy<Reductions> reductions) { + @Label("reductions") Lazy<Reductions> reductions, + @Label("backpressure-valve") BackPressureValve backPressureValve) { this.pendingAsyncOperations = Objects.requireNonNull(pendingAsyncOperations); this.reductions = Objects.requireNonNull(reductions); this.operatorMailbox = Objects.requireNonNull(operatorMailbox); + this.backPressureValve = Objects.requireNonNull(backPressureValve); } <T> void accept(Message metadata, CompletableFuture<T> future) { @@ -60,9 +65,20 @@ final class AsyncSink { } catch (Exception e) { throw new RuntimeException(e); } + backPressureValve.notifyAsyncOperationRegistered(); future.whenComplete((result, throwable) -> enqueue(metadata, futureId, result, throwable)); } + /** + * Requests to stop processing any further input for that address, as long as there is an + * uncompleted async operation (owned by @address). + * + * @param address the address + */ + void blockAddress(Address address) { + backPressureValve.blockAddress(address); + } + private <T> void enqueue(Message message, long futureId, T result, Throwable throwable) { AsyncMessageDecorator<T> decoratedMessage = new AsyncMessageDecorator<>(pendingAsyncOperations, futureId, message, result, throwable); @@ -79,6 +95,7 @@ final class AsyncSink { Reductions reductions = this.reductions.get(); Message message; while ((message = batchOfCompletedFutures.poll()) != null) { + backPressureValve.notifyAsyncOperationCompleted(message.target()); reductions.enqueue(message); } reductions.processEnvelopes();
