Repository: flink Updated Branches: refs/heads/release-0.9 d7cfa55eb -> 69f858eb9
[FLINK-2257] [streaming] Properly forward rich window function calls to wrapped functions Closes #855 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69f858eb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69f858eb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69f858eb Branch: refs/heads/release-0.9 Commit: 69f858eb99d753722a1480a32b8abf3151e11052 Parents: d7cfa55 Author: mbalassi <[email protected]> Authored: Mon Jun 22 12:45:33 2015 +0200 Committer: mbalassi <[email protected]> Committed: Thu Jun 25 13:38:56 2015 +0200 ---------------------------------------------------------------------- .../api/operators/windowing/WindowFolder.java | 23 +++++++++++++++++++- .../api/operators/windowing/WindowMapper.java | 21 ++++++++++++++++++ .../api/operators/windowing/WindowReducer.java | 22 +++++++++++++++++++ 3 files changed, 65 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/69f858eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java index b8f407a..cdfc35b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java @@ -18,10 +18,11 @@ package org.apache.flink.streaming.api.operators.windowing; import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.WindowedDataStream; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.windowing.StreamWindow; @@ -69,11 +70,31 @@ public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin return outputWindow; } + // -------------------------------------------------------------------------------------------- + // Forwarding calls to the wrapped folder + // -------------------------------------------------------------------------------------------- + + @Override + public void open(Configuration parameters) throws Exception { + FunctionUtils.openFunction(folder, parameters); + } + + @Override + public void close() throws Exception { + FunctionUtils.closeFunction(folder); + } + @Override public void setRuntimeContext(RuntimeContext t) { FunctionUtils.setFunctionRuntimeContext(folder, t); } + @Override + public RuntimeContext getRuntimeContext() { + return FunctionUtils.getFunctionRuntimeContext(folder, getRuntimeContext()); + } + + // streaming does not use iteration runtime context, so that is omitted } } http://git-wip-us.apache.org/repos/asf/flink/blob/69f858eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java index 18a237d..ec4309d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.WindowedDataStream; import org.apache.flink.streaming.api.functions.WindowMapFunction; import org.apache.flink.streaming.api.operators.StreamMap; @@ -63,11 +64,31 @@ public class WindowMapper<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin return outputWindow; } + // -------------------------------------------------------------------------------------------- + // Forwarding calls to the wrapped mapper + // -------------------------------------------------------------------------------------------- + + @Override + public void open(Configuration parameters) throws Exception { + FunctionUtils.openFunction(mapper, parameters); + } + + @Override + public void close() throws Exception { + FunctionUtils.closeFunction(mapper); + } + @Override public void setRuntimeContext(RuntimeContext t) { FunctionUtils.setFunctionRuntimeContext(mapper, t); } + @Override + public RuntimeContext getRuntimeContext() { + return FunctionUtils.getFunctionRuntimeContext(mapper, getRuntimeContext()); + } + + // streaming does not use iteration runtime context, so that is omitted } } http://git-wip-us.apache.org/repos/asf/flink/blob/69f858eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java index ff88bab..a43405e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.WindowedDataStream; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.windowing.StreamWindow; @@ -67,11 +68,32 @@ public class WindowReducer<IN> extends StreamMap<StreamWindow<IN>, StreamWindow< return outputWindow; } + // -------------------------------------------------------------------------------------------- + // Forwarding calls to the wrapped reducer + // -------------------------------------------------------------------------------------------- + + + @Override + public void open(Configuration parameters) throws Exception { + FunctionUtils.openFunction(reducer, parameters); + } + + @Override + public void close() throws Exception { + FunctionUtils.closeFunction(reducer); + } + @Override public void setRuntimeContext(RuntimeContext t) { FunctionUtils.setFunctionRuntimeContext(reducer, t); } + @Override + public RuntimeContext getRuntimeContext() { + return FunctionUtils.getFunctionRuntimeContext(reducer, getRuntimeContext()); + } + + // streaming does not use iteration runtime context, so that is omitted } }
