[FLINK-9152] Use in-class Context objects in BroadcastProcessFunction

This brings it in line with KeyedBroadcastProcessFunction, which uses
context objects defined in KeyedBroadcastProcessFunction. The context
objects here have no added functionality but we still define them here
so that the methods don't refer to the base class implementations for
consistency.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0838bbea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0838bbea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0838bbea

Branch: refs/heads/master
Commit: 0838bbeaafed791d96bfd1e8e1f8a5e486c35325
Parents: 584229d
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Mon Apr 9 16:12:43 2018 -0700
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Thu Apr 12 08:04:30 2018 -0700

----------------------------------------------------------------------
 .../api/functions/co/BroadcastProcessFunction.java      | 12 ++++++++++++
 .../operators/co/CoBroadcastWithNonKeyedOperator.java   |  4 ++--
 2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0838bbea/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
index 257ea83..9e5540e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
@@ -90,4 +90,16 @@ public abstract class BroadcastProcessFunction<IN1, IN2, 
OUT> extends BaseBroadc
         *                   to fail and go into recovery.
         */
        public abstract void processBroadcastElement(IN2 value, Context ctx, 
Collector<OUT> out) throws Exception;
+
+       /**
+        * A {@link BaseBroadcastProcessFunction.Context context} available to 
the broadcast side of
+        * a {@link 
org.apache.flink.streaming.api.datastream.BroadcastConnectedStream}.
+        */
+       public abstract class Context extends 
BaseBroadcastProcessFunction.Context {}
+
+       /**
+        * A {@link BaseBroadcastProcessFunction.Context context} available to 
the non-keyed side of
+        * a {@link 
org.apache.flink.streaming.api.datastream.BroadcastConnectedStream} (if any).
+        */
+       public abstract class ReadOnlyContext extends 
BaseBroadcastProcessFunction.ReadOnlyContext {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0838bbea/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
index 7e1e431..5bed3bb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
-import 
org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
+import 
org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction.Context;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
@@ -113,7 +113,7 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
                currentWatermark = mark.getTimestamp();
        }
 
-       private class ReadWriteContextImpl extends 
BaseBroadcastProcessFunction.Context {
+       private class ReadWriteContextImpl extends Context {
 
                private final ExecutionConfig config;
 

Reply via email to