This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 61ab8db4b03ed5828f9a85cb0ed5b07c6c4d5ce3 Author: Igal Shilman <[email protected]> AuthorDate: Thu Feb 20 22:44:53 2020 +0100 [FLINK-16063][core] Apply back pressure in FunctionGroupOperator This closes #29. --- .../flink/core/functions/FunctionGroupOperator.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java index 17162cc..713368e 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig; import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse; import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses; +import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve; import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve; import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade; import org.apache.flink.statefun.flink.core.message.Message; @@ -55,6 +56,7 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message> // -- runtime private transient Reductions reductions; private transient MailboxExecutor mailboxExecutor; + private transient BackPressureValve backPressureValve; FunctionGroupOperator( Map<EgressIdentifier<?>, OutputTag<Object>> sideOutputs, @@ -72,7 +74,10 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message> // ------------------------------------------------------------------------------------------------------------------ @Override - public void processElement(StreamRecord<Message> record) { + public void processElement(StreamRecord<Message> record) throws InterruptedException { + while (backPressureValve.shouldBackPressure()) { + mailboxExecutor.yield(); + } reductions.apply(record.getValue()); } @@ -96,15 +101,15 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message> Objects.requireNonNull(mailboxExecutor, "MailboxExecutor is unexpectedly NULL"); - // TODO: once FLINK-16149 would be merged, we should pass the threshold as a configuration. - ThresholdBackPressureValve thresholdBackPressureValve = new ThresholdBackPressureValve(1_000); + this.backPressureValve = + new ThresholdBackPressureValve(configuration.getMaxAsyncOperationsPerTask()); // // the core logic of applying messages to functions. // this.reductions = Reductions.create( - thresholdBackPressureValve, + backPressureValve, statefulFunctionsUniverse, getRuntimeContext(), getKeyedStateBackend(),
