Repository: flink Updated Branches: refs/heads/master 89de78c72 -> 4f8d01fba
[FLINK-7660] Support sideOutput in ProcessAllWindowFunction Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39682c45 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39682c45 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39682c45 Branch: refs/heads/master Commit: 39682c456a211a773014474e696babff898a76fe Parents: 89de78c Author: Bowen Li <[email protected]> Authored: Wed Sep 27 23:09:20 2017 -0700 Committer: Aljoscha Krettek <[email protected]> Committed: Thu Oct 12 11:12:33 2017 +0200 ---------------------------------------------------------------------- .../FoldApplyProcessAllWindowFunction.java | 7 +-- .../InternalProcessApplyAllWindowContext.java | 13 ++++-- .../windowing/ProcessAllWindowFunction.java | 9 ++++ .../ReduceApplyProcessAllWindowFunction.java | 6 +-- .../operators/windowing/WindowOperator.java | 1 + .../InternalProcessAllWindowContext.java | 6 +++ .../function/ProcessAllWindowFunction.scala | 6 +++ .../ScalaProcessWindowFunctionWrapper.scala | 4 ++ .../streaming/api/scala/SideOutputITCase.scala | 45 ++++++++++++++++++++ .../streaming/runtime/SideOutputITCase.java | 37 ++++++++++++++++ 10 files changed, 121 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java index 362956d..591e2af 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java @@ -106,8 +106,7 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R> } this.ctx.window = context.window(); - this.ctx.windowState = context.windowState(); - this.ctx.globalState = context.globalState(); + this.ctx.context = context; windowFunction.process(ctx, Collections.singletonList(result), out); } @@ -115,8 +114,7 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R> @Override public void clear(final Context context) throws Exception { this.ctx.window = context.window(); - this.ctx.windowState = context.windowState(); - this.ctx.globalState = context.globalState(); + this.ctx.context = context; windowFunction.clear(ctx); } @@ -136,5 +134,4 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R> serializedInitialValue = baos.toByteArray(); } - } http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java index a27d71b..98557ac 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.OutputTag; /** * Internal reusable context wrapper. @@ -34,8 +35,7 @@ public class InternalProcessApplyAllWindowContext<IN, OUT, W extends Window> extends ProcessAllWindowFunction<IN, OUT, W>.Context { W window; - KeyedStateStore windowState; - KeyedStateStore globalState; + ProcessAllWindowFunction.Context context; InternalProcessApplyAllWindowContext(ProcessAllWindowFunction<IN, OUT, W> function) { function.super(); @@ -48,11 +48,16 @@ public class InternalProcessApplyAllWindowContext<IN, OUT, W extends Window> @Override public KeyedStateStore windowState() { - return windowState; + return context.windowState(); } @Override public KeyedStateStore globalState() { - return globalState; + return context.globalState(); + } + + @Override + public <X> void output(OutputTag<X> outputTag, X value) { + context.output(outputTag, value); } } http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java index 34a37bf..f27f3c0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; /** * Base abstract class for functions that are evaluated over non-keyed windows using a context @@ -77,5 +78,13 @@ public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> extend * State accessor for per-key global state. */ public abstract KeyedStateStore globalState(); + + /** + * Emits a record to the side output identified by the {@link OutputTag}. + * + * @param outputTag the {@code OutputTag} that identifies the side output to emit to. + * @param value The record to emit. + */ + public abstract <X> void output(OutputTag<X> outputTag, X value); } } http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java index 108ba9e..ee8328a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java @@ -60,8 +60,7 @@ public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R> extends } this.ctx.window = context.window(); - this.ctx.windowState = context.windowState(); - this.ctx.globalState = context.globalState(); + this.ctx.context = context; windowFunction.process(ctx, Collections.singletonList(curr), out); } @@ -69,8 +68,7 @@ public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R> extends @Override public void clear(final Context context) throws Exception { this.ctx.window = context.window(); - this.ctx.windowState = context.windowState(); - this.ctx.globalState = context.globalState(); + this.ctx.context = context; windowFunction.clear(ctx); } http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index fd90e65..4e75345 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -775,6 +775,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> return WindowOperator.this.getKeyedStateStore(); } + @Override public <X> void output(OutputTag<X> outputTag, X value) { if (outputTag == null) { throw new IllegalArgumentException("OutputTag must not be null."); http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java index 66ec656..f1146b9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.OutputTag; /** * Internal reusable context wrapper. @@ -55,4 +56,9 @@ public class InternalProcessAllWindowContext<IN, OUT, W extends Window> public KeyedStateStore globalState() { return internalContext.globalState(); } + + @Override + public <X> void output(OutputTag<X> outputTag, X value) { + internalContext.output(outputTag, value); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala index 49911e4..b91b2a0 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala @@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala.function import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.functions.AbstractRichFunction import org.apache.flink.api.common.state.KeyedStateStore +import org.apache.flink.streaming.api.scala.OutputTag import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector @@ -73,6 +74,11 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] * State accessor for per-key global state. */ def globalState: KeyedStateStore + + /** + * Emits a record to the side output identified by the [[OutputTag]]. + */ + def output[X](outputTag: OutputTag[X], value: X) } } http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala index 98b050c..9a6156d 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala @@ -127,6 +127,8 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window]( override def windowState = context.windowState() override def globalState = context.globalState() + + override def output[X](outputTag: OutputTag[X], value: X) = context.output(outputTag, value) } func.process(ctx, elements.asScala, out) } @@ -138,6 +140,8 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window]( override def windowState = context.windowState() override def globalState = context.globalState() + + override def output[X](outputTag: OutputTag[X], value: X) = context.output(outputTag, value) } func.clear(ctx) } http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala index f09323c..8e66171 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala @@ -280,6 +280,51 @@ class SideOutputITCase extends StreamingMultipleProgramsTestBase { assertEquals(util.Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getResult) } + + /** + * Test ProcessAllWindowFunction side output. + */ + @Test + def testProcessAllWindowFunctionSideOutput() { + val resultSink = new TestListResultSink[String] + val sideOutputResultSink = new TestListResultSink[String] + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4)) + + + val sideOutputTag = OutputTag[String]("side") + + val windowOperator = dataStream + .assignTimestampsAndWatermarks(new TestAssigner) + .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(1))) + .process(new ProcessAllWindowFunction[(String, Int), String, TimeWindow] { + override def process( + context: Context, + elements: Iterable[(String, Int)], + out: Collector[String]): Unit = { + for (in <- elements) { + out.collect(in._1) + context.output(sideOutputTag, "sideout-" + in._1) + } + } + }) + + windowOperator + .getSideOutput(sideOutputTag) + .addSink(sideOutputResultSink) + + windowOperator.addSink(resultSink) + + env.execute() + + assertEquals(util.Arrays.asList("1", "2", "5"), resultSink.getResult) + assertEquals(util.Arrays.asList("sideout-1", "sideout-2", "sideout-5"), + sideOutputResultSink.getResult) + } } class TestAssigner extends AssignerWithPunctuatedWatermarks[(String, Int)] { http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java index f74f8ff..7f3fe8b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -582,4 +583,40 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult()); assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult()); } + + @Test + public void testProcessAllWindowFunctionSideOutput() throws Exception { + TestListResultSink<Integer> resultSink = new TestListResultSink<>(); + TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>(); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(1); + see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + DataStream<Integer> dataStream = see.fromCollection(elements); + + OutputTag<String> sideOutputTag = new OutputTag<String>("side"){}; + + SingleOutputStreamOperator<Integer> windowOperator = dataStream + .assignTimestampsAndWatermarks(new TestWatermarkAssigner()) + .timeWindowAll(Time.milliseconds(1), Time.milliseconds(1)) + .process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void process(Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception { + for (Integer e : elements) { + out.collect(e); + context.output(sideOutputTag, "sideout-" + String.valueOf(e)); + } + } + }); + + windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink); + windowOperator.addSink(resultSink); + see.execute(); + + assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult()); + assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult()); + } }
