This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 3bb5c652666eaaa469acab1c66bf4e9b9678c01e Author: Igal Shilman <[email protected]> AuthorDate: Thu Feb 20 22:11:49 2020 +0100 [FLINK-16063][core] Wire BackPressureValve --- .../flink/statefun/flink/core/functions/FunctionGroupOperator.java | 5 +++++ .../org/apache/flink/statefun/flink/core/functions/Reductions.java | 4 ++++ .../apache/flink/statefun/flink/core/functions/ReductionsTest.java | 2 ++ 3 files changed, 11 insertions(+) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java index 13a8dac..17162cc 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig; import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse; import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses; +import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve; import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade; import org.apache.flink.statefun.flink.core.message.Message; import org.apache.flink.statefun.flink.core.message.MessageFactory; @@ -95,11 +96,15 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message> Objects.requireNonNull(mailboxExecutor, "MailboxExecutor is unexpectedly NULL"); + // TODO: once FLINK-16149 would be merged, we should pass the threshold as a configuration. + ThresholdBackPressureValve thresholdBackPressureValve = new ThresholdBackPressureValve(1_000); + // // the core logic of applying messages to functions. // this.reductions = Reductions.create( + thresholdBackPressureValve, statefulFunctionsUniverse, getRuntimeContext(), getKeyedStateBackend(), diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java index fec2c92..7561541 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java @@ -26,6 +26,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse; +import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve; import org.apache.flink.statefun.flink.core.di.Inject; import org.apache.flink.statefun.flink.core.di.Lazy; import org.apache.flink.statefun.flink.core.di.ObjectContainer; @@ -51,6 +52,7 @@ final class Reductions { } static Reductions create( + BackPressureValve valve, StatefulFunctionsUniverse statefulFunctionsUniverse, RuntimeContext context, KeyedStateBackend<Object> keyedStateBackend, @@ -116,6 +118,8 @@ final class Reductions { container.add("async-operations", MapState.class, asyncOperations); container.add(AsyncSink.class); + container.add("backpressure-valve", BackPressureValve.class, valve); + return container.get(Reductions.class); } diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java index e279913..78687c3 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java @@ -70,6 +70,7 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.MoreExecutors; import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse; import org.apache.flink.statefun.flink.core.TestUtils; +import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve; import org.apache.flink.statefun.flink.core.message.Message; import org.apache.flink.statefun.flink.core.message.MessageFactoryType; import org.apache.flink.streaming.api.operators.InternalTimerService; @@ -88,6 +89,7 @@ public class ReductionsTest { public void testFactory() { Reductions reductions = Reductions.create( + new ThresholdBackPressureValve(-1), new StatefulFunctionsUniverse(MessageFactoryType.WITH_KRYO_PAYLOADS), new FakeRuntimeContext(), new FakeKeyedStateBackend(),
