This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 9b53e600e088264ff5ccec3c606a202636baa68b Author: Igal Shilman <[email protected]> AuthorDate: Mon Dec 21 14:53:45 2020 +0100 [FLINK-20699] Set FeedbackKey invocation_id explicitly This closes #190. --- .../flink/statefun/flink/core/StatefulFunctionsJob.java | 10 +++++++++- .../flink/statefun/flink/core/feedback/FeedbackKey.java | 2 +- .../statefun/flink/core/translation/FlinkUniverse.java | 11 +++++++---- .../datastream/StatefulFunctionDataStreamBuilder.java | 16 ++++++++++------ 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java index 5f56786..ac2065e 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java @@ -21,15 +21,20 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; +import org.apache.flink.statefun.flink.core.feedback.FeedbackKey; +import org.apache.flink.statefun.flink.core.message.Message; import org.apache.flink.statefun.flink.core.translation.FlinkUniverse; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.FlinkUserCodeClassLoader; public class StatefulFunctionsJob { + private static final AtomicInteger FEEDBACK_INVOCATION_ID_SEQ = new AtomicInteger(); + public static void main(String... args) throws Exception { ParameterTool parameterTool = ParameterTool.fromArgs(args); Map<String, String> globalConfigurations = parameterTool.toMap(); @@ -70,7 +75,10 @@ public class StatefulFunctionsJob { new StatefulFunctionsUniverseValidator(); statefulFunctionsUniverseValidator.validate(statefulFunctionsUniverse); - FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse, stateFunConfig); + FeedbackKey<Message> feedbackKey = + new FeedbackKey<>("statefun-pipeline", FEEDBACK_INVOCATION_ID_SEQ.incrementAndGet()); + FlinkUniverse flinkUniverse = + new FlinkUniverse(feedbackKey, stateFunConfig, statefulFunctionsUniverse); flinkUniverse.configure(env); env.execute(stateFunConfig.getFlinkJobName()); diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java index caff5ee..ef6ad58 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java @@ -34,7 +34,7 @@ public final class FeedbackKey<V> implements Serializable { } public SubtaskFeedbackKey<V> withSubTaskIndex(int subTaskIndex, int attemptId) { - return new SubtaskFeedbackKey<>(pipelineName, invocationId, attemptId, subTaskIndex); + return new SubtaskFeedbackKey<>(pipelineName, invocationId, subTaskIndex, attemptId); } @Override diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java index 5c66d62..785b99e 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java @@ -28,14 +28,17 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public final class FlinkUniverse { - private static final FeedbackKey<Message> FEEDBACK_KEY = - new FeedbackKey<>("statefun-pipeline", 1); private final StatefulFunctionsUniverse universe; private final StatefulFunctionsConfig configuration; + private final FeedbackKey<Message> feedbackKey; - public FlinkUniverse(StatefulFunctionsUniverse universe, StatefulFunctionsConfig configuration) { + public FlinkUniverse( + FeedbackKey<Message> feedbackKey, + StatefulFunctionsConfig configuration, + StatefulFunctionsUniverse universe) { + this.feedbackKey = Objects.requireNonNull(feedbackKey); this.universe = Objects.requireNonNull(universe); this.configuration = Objects.requireNonNull(configuration); } @@ -45,7 +48,7 @@ public final class FlinkUniverse { Sinks sinks = Sinks.create(universe); StatefulFunctionTranslator translator = - new StatefulFunctionTranslator(FEEDBACK_KEY, configuration); + new StatefulFunctionTranslator(feedbackKey, configuration); Map<EgressIdentifier<?>, DataStream<?>> sideOutputs = translator.translate(sources, sinks); diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java index 58dbb92..60d3ab9 100644 --- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java +++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import org.apache.flink.shaded.guava18.com.google.common.base.Optional; import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig; @@ -46,17 +47,18 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; */ public final class StatefulFunctionDataStreamBuilder { + private static final AtomicInteger FEEDBACK_INVOCATION_ID_SEQ = new AtomicInteger(); + /** Creates a {@code StatefulFunctionDataStreamBuilder}. */ public static StatefulFunctionDataStreamBuilder builder(String pipelineName) { - FeedbackKey<Message> key = new FeedbackKey<>(pipelineName, 1); - return new StatefulFunctionDataStreamBuilder(key); + return new StatefulFunctionDataStreamBuilder(pipelineName); } - private StatefulFunctionDataStreamBuilder(FeedbackKey<Message> feedbackKey) { - this.feedbackKey = Objects.requireNonNull(feedbackKey); + private StatefulFunctionDataStreamBuilder(String pipelineName) { + this.pipelineName = Objects.requireNonNull(pipelineName); } - private final FeedbackKey<Message> feedbackKey; + private final String pipelineName; private final List<DataStream<RoutableMessage>> definedIngresses = new ArrayList<>(); private final Map<FunctionType, SerializableStatefulFunctionProvider> functionProviders = new HashMap<>(); @@ -148,7 +150,9 @@ public final class StatefulFunctionDataStreamBuilder { requestReplyFunctions.forEach( (type, unused) -> functionProviders.put(type, httpFunctionProvider)); - EmbeddedTranslator embeddedTranslator = new EmbeddedTranslator(config, feedbackKey); + FeedbackKey<Message> key = + new FeedbackKey<>(pipelineName, FEEDBACK_INVOCATION_ID_SEQ.incrementAndGet()); + EmbeddedTranslator embeddedTranslator = new EmbeddedTranslator(config, key); Map<EgressIdentifier<?>, DataStream<?>> sideOutputs = embeddedTranslator.translate(definedIngresses, egressesIds, functionProviders); return new StatefulFunctionEgressStreams(sideOutputs);
