Repository: flink
Updated Branches:
  refs/heads/release-0.9 d7cfa55eb -> 69f858eb9


[FLINK-2257] [streaming] Properly forward rich window function calls to wrapped 
functions

Closes #855


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69f858eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69f858eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69f858eb

Branch: refs/heads/release-0.9
Commit: 69f858eb99d753722a1480a32b8abf3151e11052
Parents: d7cfa55
Author: mbalassi <[email protected]>
Authored: Mon Jun 22 12:45:33 2015 +0200
Committer: mbalassi <[email protected]>
Committed: Thu Jun 25 13:38:56 2015 +0200

----------------------------------------------------------------------
 .../api/operators/windowing/WindowFolder.java   | 23 +++++++++++++++++++-
 .../api/operators/windowing/WindowMapper.java   | 21 ++++++++++++++++++
 .../api/operators/windowing/WindowReducer.java  | 22 +++++++++++++++++++
 3 files changed, 65 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/69f858eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
index b8f407a..cdfc35b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
@@ -18,10 +18,11 @@
 package org.apache.flink.streaming.api.operators.windowing;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
@@ -69,11 +70,31 @@ public class WindowFolder<IN, OUT> extends 
StreamMap<StreamWindow<IN>, StreamWin
                        return outputWindow;
                }
 
+               // 
--------------------------------------------------------------------------------------------
+               //  Forwarding calls to the wrapped folder
+               // 
--------------------------------------------------------------------------------------------
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       FunctionUtils.openFunction(folder, parameters);
+               }
+
+               @Override
+               public void close() throws Exception {
+                       FunctionUtils.closeFunction(folder);
+               }
+
                @Override
                public void setRuntimeContext(RuntimeContext t) {
                        FunctionUtils.setFunctionRuntimeContext(folder, t);
                }
 
+               @Override
+               public RuntimeContext getRuntimeContext() {
+                       return FunctionUtils.getFunctionRuntimeContext(folder, 
getRuntimeContext());
+               }
+
+               // streaming does not use iteration runtime context, so that is 
omitted
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/69f858eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
index 18a237d..ec4309d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
@@ -21,6 +21,7 @@ import 
org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.streaming.api.functions.WindowMapFunction;
 import org.apache.flink.streaming.api.operators.StreamMap;
@@ -63,11 +64,31 @@ public class WindowMapper<IN, OUT> extends 
StreamMap<StreamWindow<IN>, StreamWin
                        return outputWindow;
                }
 
+               // 
--------------------------------------------------------------------------------------------
+               //  Forwarding calls to the wrapped mapper
+               // 
--------------------------------------------------------------------------------------------
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       FunctionUtils.openFunction(mapper, parameters);
+               }
+
+               @Override
+               public void close() throws Exception {
+                       FunctionUtils.closeFunction(mapper);
+               }
+
                @Override
                public void setRuntimeContext(RuntimeContext t) {
                        FunctionUtils.setFunctionRuntimeContext(mapper, t);
                }
 
+               @Override
+               public RuntimeContext getRuntimeContext() {
+                       return FunctionUtils.getFunctionRuntimeContext(mapper, 
getRuntimeContext());
+               }
+
+               // streaming does not use iteration runtime context, so that is 
omitted
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/69f858eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
index ff88bab..a43405e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
@@ -67,11 +68,32 @@ public class WindowReducer<IN> extends 
StreamMap<StreamWindow<IN>, StreamWindow<
                        return outputWindow;
                }
 
+               // 
--------------------------------------------------------------------------------------------
+               //  Forwarding calls to the wrapped reducer
+               // 
--------------------------------------------------------------------------------------------
+
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       FunctionUtils.openFunction(reducer, parameters);
+               }
+
+               @Override
+               public void close() throws Exception {
+                       FunctionUtils.closeFunction(reducer);
+               }
+
                @Override
                public void setRuntimeContext(RuntimeContext t) {
                        FunctionUtils.setFunctionRuntimeContext(reducer, t);
                }
 
+               @Override
+               public RuntimeContext getRuntimeContext() {
+                       return FunctionUtils.getFunctionRuntimeContext(reducer, 
getRuntimeContext());
+               }
+
+               // streaming does not use iteration runtime context, so that is 
omitted
        }
 
 }

Reply via email to