This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 4aac946624eed8a0b9f334acad550de7e2e3ad7e Author: Igal Shilman <[email protected]> AuthorDate: Tue May 5 22:27:02 2020 +0200 [FLINK-17533] Expose checkpointId from barrier messages This commit changes the boolean method isBarrierMessage() to return an OptionalLong that would carry the checkpointId when the message is a checkpoint barrier. --- .../flink/core/feedback/FeedbackUnionOperatorFactory.java | 6 +++--- .../statefun/flink/core/functions/AsyncMessageDecorator.java | 5 +++-- .../org/apache/flink/statefun/flink/core/message/Message.java | 11 ++++++++++- .../flink/statefun/flink/core/message/ProtobufMessage.java | 9 +++++++-- .../apache/flink/statefun/flink/core/message/SdkMessage.java | 5 +++-- .../flink/statefun/flink/core/translation/FlinkUniverse.java | 7 ++++--- 6 files changed, 30 insertions(+), 13 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java index 580ad2d..f87128d 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperatorFactory.java @@ -18,10 +18,10 @@ package org.apache.flink.statefun.flink.core.feedback; import java.util.Objects; +import java.util.OptionalLong; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig; import org.apache.flink.statefun.flink.core.common.SerializableFunction; -import org.apache.flink.statefun.flink.core.common.SerializablePredicate; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.*; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -35,7 +35,7 @@ public final class FeedbackUnionOperatorFactory<E> private final StatefulFunctionsConfig configuration; private final FeedbackKey<E> feedbackKey; - private final SerializablePredicate<E> isBarrierMessage; + private final SerializableFunction<E, OptionalLong> isBarrierMessage; private final SerializableFunction<E, ?> keySelector; private transient MailboxExecutor mailboxExecutor; @@ -43,7 +43,7 @@ public final class FeedbackUnionOperatorFactory<E> public FeedbackUnionOperatorFactory( StatefulFunctionsConfig configuration, FeedbackKey<E> feedbackKey, - SerializablePredicate<E> isBarrierMessage, + SerializableFunction<E, OptionalLong> isBarrierMessage, SerializableFunction<E, ?> keySelector) { this.feedbackKey = Objects.requireNonNull(feedbackKey); this.isBarrierMessage = Objects.requireNonNull(isBarrierMessage); diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java index ee6e869..c77adb7 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncMessageDecorator.java @@ -17,6 +17,7 @@ */ package org.apache.flink.statefun.flink.core.functions; +import java.util.OptionalLong; import javax.annotation.Nullable; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.statefun.flink.core.message.Message; @@ -87,8 +88,8 @@ final class AsyncMessageDecorator<T> implements Message { } @Override - public boolean isBarrierMessage() { - return false; + public OptionalLong isBarrierMessage() { + return OptionalLong.empty(); } @Override diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java index 2e4cf99..9b0cbee 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java @@ -18,6 +18,7 @@ package org.apache.flink.statefun.flink.core.message; import java.io.IOException; +import java.util.OptionalLong; import javax.annotation.Nullable; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.statefun.sdk.Address; @@ -31,7 +32,15 @@ public interface Message { Object payload(MessageFactory context, ClassLoader targetClassLoader); - boolean isBarrierMessage(); + /** + * isBarrierMessage - returns an empty optional for non barrier messages or wrapped checkpointId + * for barrier messages. + * + * <p>When this message represents a checkpoint barrier, this method returns an {@code Optional} + * of a checkpoint id that produced that barrier. For other types of messages (i.e. {@code + * Payload}) this method returns an empty {@code Optional}. + */ + OptionalLong isBarrierMessage(); Message copy(MessageFactory context); diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java index 8880a5d..dabda14 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/ProtobufMessage.java @@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.message; import java.io.IOException; import java.util.Objects; +import java.util.OptionalLong; import javax.annotation.Nullable; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.statefun.flink.core.generated.Envelope; @@ -72,8 +73,12 @@ final class ProtobufMessage implements Message { } @Override - public boolean isBarrierMessage() { - return envelope.hasCheckpoint(); + public OptionalLong isBarrierMessage() { + if (!envelope.hasCheckpoint()) { + return OptionalLong.empty(); + } + final long checkpointId = envelope.getCheckpoint().getCheckpointId(); + return OptionalLong.of(checkpointId); } @Override diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java index cfc785c..c10f2e9 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/SdkMessage.java @@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.message; import java.io.IOException; import java.util.Objects; +import java.util.OptionalLong; import javax.annotation.Nullable; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.statefun.flink.core.generated.Envelope; @@ -62,8 +63,8 @@ final class SdkMessage implements Message { } @Override - public boolean isBarrierMessage() { - return false; + public OptionalLong isBarrierMessage() { + return OptionalLong.empty(); } @Override diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java index ff8df49..df61045 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java @@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core.translation; import java.util.Map; import java.util.Objects; +import java.util.OptionalLong; import java.util.function.LongFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig; @@ -26,7 +27,6 @@ import org.apache.flink.statefun.flink.core.StatefulFunctionsJobConstants; import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse; import org.apache.flink.statefun.flink.core.common.KeyBy; import org.apache.flink.statefun.flink.core.common.SerializableFunction; -import org.apache.flink.statefun.flink.core.common.SerializablePredicate; import org.apache.flink.statefun.flink.core.feedback.FeedbackKey; import org.apache.flink.statefun.flink.core.feedback.FeedbackSinkOperator; import org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperatorFactory; @@ -123,12 +123,13 @@ public final class FlinkUniverse { c.getTransformation().setParallelism(b.getParallelism()); } - private static final class IsCheckpointBarrier implements SerializablePredicate<Message> { + private static final class IsCheckpointBarrier + implements SerializableFunction<Message, OptionalLong> { private static final long serialVersionUID = 1; @Override - public boolean test(Message message) { + public OptionalLong apply(Message message) { return message.isBarrierMessage(); } }
