This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 9b53e600e088264ff5ccec3c606a202636baa68b
Author: Igal Shilman <[email protected]>
AuthorDate: Mon Dec 21 14:53:45 2020 +0100

    [FLINK-20699] Set FeedbackKey invocation_id explicitly
    
    This closes #190.
---
 .../flink/statefun/flink/core/StatefulFunctionsJob.java  | 10 +++++++++-
 .../flink/statefun/flink/core/feedback/FeedbackKey.java  |  2 +-
 .../statefun/flink/core/translation/FlinkUniverse.java   | 11 +++++++----
 .../datastream/StatefulFunctionDataStreamBuilder.java    | 16 ++++++++++------
 4 files changed, 27 insertions(+), 12 deletions(-)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index 5f56786..ac2065e 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -21,15 +21,20 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
+import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.FlinkUserCodeClassLoader;
 
 public class StatefulFunctionsJob {
 
+  private static final AtomicInteger FEEDBACK_INVOCATION_ID_SEQ = new 
AtomicInteger();
+
   public static void main(String... args) throws Exception {
     ParameterTool parameterTool = ParameterTool.fromArgs(args);
     Map<String, String> globalConfigurations = parameterTool.toMap();
@@ -70,7 +75,10 @@ public class StatefulFunctionsJob {
         new StatefulFunctionsUniverseValidator();
     statefulFunctionsUniverseValidator.validate(statefulFunctionsUniverse);
 
-    FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse, 
stateFunConfig);
+    FeedbackKey<Message> feedbackKey =
+        new FeedbackKey<>("statefun-pipeline", 
FEEDBACK_INVOCATION_ID_SEQ.incrementAndGet());
+    FlinkUniverse flinkUniverse =
+        new FlinkUniverse(feedbackKey, stateFunConfig, 
statefulFunctionsUniverse);
     flinkUniverse.configure(env);
 
     env.execute(stateFunConfig.getFlinkJobName());
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
index caff5ee..ef6ad58 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
@@ -34,7 +34,7 @@ public final class FeedbackKey<V> implements Serializable {
   }
 
   public SubtaskFeedbackKey<V> withSubTaskIndex(int subTaskIndex, int 
attemptId) {
-    return new SubtaskFeedbackKey<>(pipelineName, invocationId, attemptId, 
subTaskIndex);
+    return new SubtaskFeedbackKey<>(pipelineName, invocationId, subTaskIndex, 
attemptId);
   }
 
   @Override
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
index 5c66d62..785b99e 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
@@ -28,14 +28,17 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 public final class FlinkUniverse {
-  private static final FeedbackKey<Message> FEEDBACK_KEY =
-      new FeedbackKey<>("statefun-pipeline", 1);
 
   private final StatefulFunctionsUniverse universe;
 
   private final StatefulFunctionsConfig configuration;
+  private final FeedbackKey<Message> feedbackKey;
 
-  public FlinkUniverse(StatefulFunctionsUniverse universe, 
StatefulFunctionsConfig configuration) {
+  public FlinkUniverse(
+      FeedbackKey<Message> feedbackKey,
+      StatefulFunctionsConfig configuration,
+      StatefulFunctionsUniverse universe) {
+    this.feedbackKey = Objects.requireNonNull(feedbackKey);
     this.universe = Objects.requireNonNull(universe);
     this.configuration = Objects.requireNonNull(configuration);
   }
@@ -45,7 +48,7 @@ public final class FlinkUniverse {
     Sinks sinks = Sinks.create(universe);
 
     StatefulFunctionTranslator translator =
-        new StatefulFunctionTranslator(FEEDBACK_KEY, configuration);
+        new StatefulFunctionTranslator(feedbackKey, configuration);
 
     Map<EgressIdentifier<?>, DataStream<?>> sideOutputs = 
translator.translate(sources, sinks);
 
diff --git 
a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
 
b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
index 58dbb92..60d3ab9 100644
--- 
a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
+++ 
b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
 import org.apache.flink.shaded.guava18.com.google.common.base.Optional;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
@@ -46,17 +47,18 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  */
 public final class StatefulFunctionDataStreamBuilder {
 
+  private static final AtomicInteger FEEDBACK_INVOCATION_ID_SEQ = new 
AtomicInteger();
+
   /** Creates a {@code StatefulFunctionDataStreamBuilder}. */
   public static StatefulFunctionDataStreamBuilder builder(String pipelineName) 
{
-    FeedbackKey<Message> key = new FeedbackKey<>(pipelineName, 1);
-    return new StatefulFunctionDataStreamBuilder(key);
+    return new StatefulFunctionDataStreamBuilder(pipelineName);
   }
 
-  private StatefulFunctionDataStreamBuilder(FeedbackKey<Message> feedbackKey) {
-    this.feedbackKey = Objects.requireNonNull(feedbackKey);
+  private StatefulFunctionDataStreamBuilder(String pipelineName) {
+    this.pipelineName = Objects.requireNonNull(pipelineName);
   }
 
-  private final FeedbackKey<Message> feedbackKey;
+  private final String pipelineName;
   private final List<DataStream<RoutableMessage>> definedIngresses = new 
ArrayList<>();
   private final Map<FunctionType, SerializableStatefulFunctionProvider> 
functionProviders =
       new HashMap<>();
@@ -148,7 +150,9 @@ public final class StatefulFunctionDataStreamBuilder {
     requestReplyFunctions.forEach(
         (type, unused) -> functionProviders.put(type, httpFunctionProvider));
 
-    EmbeddedTranslator embeddedTranslator = new EmbeddedTranslator(config, 
feedbackKey);
+    FeedbackKey<Message> key =
+        new FeedbackKey<>(pipelineName, 
FEEDBACK_INVOCATION_ID_SEQ.incrementAndGet());
+    EmbeddedTranslator embeddedTranslator = new EmbeddedTranslator(config, 
key);
     Map<EgressIdentifier<?>, DataStream<?>> sideOutputs =
         embeddedTranslator.translate(definedIngresses, egressesIds, 
functionProviders);
     return new StatefulFunctionEgressStreams(sideOutputs);

Reply via email to