[FLINK-9152] Use in-class Context objects in BroadcastProcessFunction This brings it in line with KeyedBroadcastProcessFunction, which uses context objects defined in KeyedBroadcastProcessFunction. The context objects here have no added functionality but we still define them here so that the methods don't refer to the base class implementations for consistency.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0838bbea Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0838bbea Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0838bbea Branch: refs/heads/master Commit: 0838bbeaafed791d96bfd1e8e1f8a5e486c35325 Parents: 584229d Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Mon Apr 9 16:12:43 2018 -0700 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Thu Apr 12 08:04:30 2018 -0700 ---------------------------------------------------------------------- .../api/functions/co/BroadcastProcessFunction.java | 12 ++++++++++++ .../operators/co/CoBroadcastWithNonKeyedOperator.java | 4 ++-- 2 files changed, 14 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0838bbea/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java index 257ea83..9e5540e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java @@ -90,4 +90,16 @@ public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadc * to fail and go into recovery. */ public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception; + + /** + * A {@link BaseBroadcastProcessFunction.Context context} available to the broadcast side of + * a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream}. + */ + public abstract class Context extends BaseBroadcastProcessFunction.Context {} + + /** + * A {@link BaseBroadcastProcessFunction.Context context} available to the non-keyed side of + * a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream} (if any). + */ + public abstract class ReadOnlyContext extends BaseBroadcastProcessFunction.ReadOnlyContext {} } http://git-wip-us.apache.org/repos/asf/flink/blob/0838bbea/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java index 7e1e431..5bed3bb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java @@ -23,8 +23,8 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; -import org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction.Context; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.TimestampedCollector; @@ -113,7 +113,7 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT> currentWatermark = mark.getTimestamp(); } - private class ReadWriteContextImpl extends BaseBroadcastProcessFunction.Context { + private class ReadWriteContextImpl extends Context { private final ExecutionConfig config;