This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit ac6f2530ae8ba4272e8c3c4dc38bdb499092d9c0
Author: Igal Shilman <[email protected]>
AuthorDate: Tue May 5 22:27:02 2020 +0200

    [FLINK-17533] Expose checkpointId from barrier messages
    
    This commit changes the boolean method isBarrierMessage() to
    return an OptionalLong that would carry the checkpointId when the message
    is a checkpoint barrier.
---
 .../flink/core/feedback/FeedbackUnionOperatorFactory.java     |  6 +++---
 .../statefun/flink/core/functions/AsyncMessageDecorator.java  |  5 +++--
 .../org/apache/flink/statefun/flink/core/message/Message.java | 11 ++++++++++-
 .../flink/statefun/flink/core/message/ProtobufMessage.java    |  9 +++++++--
 .../apache/flink/statefun/flink/core/message/SdkMessage.java  |  5 +++--
 .../flink/statefun/flink/core/translation/FlinkUniverse.java  |  7 ++++---
 6 files changed, 30 insertions(+), 13 deletions(-)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
index 580ad2d..f87128d 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java
@@ -18,10 +18,10 @@
 package org.apache.flink.statefun.flink.core.feedback;
 
 import java.util.Objects;
+import java.util.OptionalLong;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.apache.flink.statefun.flink.core.common.SerializableFunction;
-import org.apache.flink.statefun.flink.core.common.SerializablePredicate;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.*;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -35,7 +35,7 @@ public final class FeedbackUnionOperatorFactory<E>
   private final StatefulFunctionsConfig configuration;
 
   private final FeedbackKey<E> feedbackKey;
-  private final SerializablePredicate<E> isBarrierMessage;
+  private final SerializableFunction<E, OptionalLong> isBarrierMessage;
   private final SerializableFunction<E, ?> keySelector;
 
   private transient MailboxExecutor mailboxExecutor;
@@ -43,7 +43,7 @@ public final class FeedbackUnionOperatorFactory<E>
   public FeedbackUnionOperatorFactory(
       StatefulFunctionsConfig configuration,
       FeedbackKey<E> feedbackKey,
-      SerializablePredicate<E> isBarrierMessage,
+      SerializableFunction<E, OptionalLong> isBarrierMessage,
       SerializableFunction<E, ?> keySelector) {
     this.feedbackKey = Objects.requireNonNull(feedbackKey);
     this.isBarrierMessage = Objects.requireNonNull(isBarrierMessage);
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
index ee6e869..c77adb7 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.statefun.flink.core.functions;
 
+import java.util.OptionalLong;
 import javax.annotation.Nullable;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.statefun.flink.core.message.Message;
@@ -87,8 +88,8 @@ final class AsyncMessageDecorator<T> implements Message {
   }
 
   @Override
-  public boolean isBarrierMessage() {
-    return false;
+  public OptionalLong isBarrierMessage() {
+    return OptionalLong.empty();
   }
 
   @Override
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java
index 2e4cf99..9b0cbee 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java
@@ -18,6 +18,7 @@
 package org.apache.flink.statefun.flink.core.message;
 
 import java.io.IOException;
+import java.util.OptionalLong;
 import javax.annotation.Nullable;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.statefun.sdk.Address;
@@ -31,7 +32,15 @@ public interface Message {
 
   Object payload(MessageFactory context, ClassLoader targetClassLoader);
 
-  boolean isBarrierMessage();
+  /**
+   * isBarrierMessage - returns an empty optional for non barrier messages or 
wrapped checkpointId
+   * for barrier messages.
+   *
+   * <p>When this message represents a checkpoint barrier, this method returns 
an {@code Optional}
+   * of a checkpoint id that produced that barrier. For other types of 
messages (i.e. {@code
+   * Payload}) this method returns an empty {@code Optional}.
+   */
+  OptionalLong isBarrierMessage();
 
   Message copy(MessageFactory context);
 
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java
index 8880a5d..dabda14 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java
@@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.message;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.OptionalLong;
 import javax.annotation.Nullable;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.statefun.flink.core.generated.Envelope;
@@ -72,8 +73,12 @@ final class ProtobufMessage implements Message {
   }
 
   @Override
-  public boolean isBarrierMessage() {
-    return envelope.hasCheckpoint();
+  public OptionalLong isBarrierMessage() {
+    if (!envelope.hasCheckpoint()) {
+      return OptionalLong.empty();
+    }
+    final long checkpointId = envelope.getCheckpoint().getCheckpointId();
+    return OptionalLong.of(checkpointId);
   }
 
   @Override
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java
index cfc785c..c10f2e9 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java
@@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.message;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.OptionalLong;
 import javax.annotation.Nullable;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.statefun.flink.core.generated.Envelope;
@@ -62,8 +63,8 @@ final class SdkMessage implements Message {
   }
 
   @Override
-  public boolean isBarrierMessage() {
-    return false;
+  public OptionalLong isBarrierMessage() {
+    return OptionalLong.empty();
   }
 
   @Override
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
index ff8df49..df61045 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
@@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.translation;
 
 import java.util.Map;
 import java.util.Objects;
+import java.util.OptionalLong;
 import java.util.function.LongFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
@@ -26,7 +27,6 @@ import 
org.apache.flink.statefun.flink.core.StatefulFunctionsJobConstants;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
 import org.apache.flink.statefun.flink.core.common.KeyBy;
 import org.apache.flink.statefun.flink.core.common.SerializableFunction;
-import org.apache.flink.statefun.flink.core.common.SerializablePredicate;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackSinkOperator;
 import 
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperatorFactory;
@@ -123,12 +123,13 @@ public final class FlinkUniverse {
     c.getTransformation().setParallelism(b.getParallelism());
   }
 
-  private static final class IsCheckpointBarrier implements 
SerializablePredicate<Message> {
+  private static final class IsCheckpointBarrier
+      implements SerializableFunction<Message, OptionalLong> {
 
     private static final long serialVersionUID = 1;
 
     @Override
-    public boolean test(Message message) {
+    public OptionalLong apply(Message message) {
       return message.isBarrierMessage();
     }
   }

Reply via email to