[FLINK-6164] Make ProcessWindowFunction a RichFunction

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e76a0aa9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e76a0aa9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e76a0aa9

Branch: refs/heads/release-1.3
Commit: e76a0aa9e2324c4647509379fe4125d8dc576ff0
Parents: e5adf11
Author: zentol <[email protected]>
Authored: Wed May 3 15:57:05 2017 +0200
Committer: zentol <[email protected]>
Committed: Tue May 9 21:07:33 2017 +0200

----------------------------------------------------------------------
 .../FoldApplyProcessAllWindowFunction.java      |  2 +-
 .../FoldApplyProcessWindowFunction.java         |  2 +-
 .../windowing/ProcessAllWindowFunction.java     |  4 +-
 .../windowing/ProcessWindowFunction.java        |  4 +-
 .../ReduceApplyProcessAllWindowFunction.java    |  3 +-
 .../ReduceApplyProcessWindowFunction.java       |  2 +-
 .../windowing/RichProcessAllWindowFunction.java | 53 ++------------------
 .../windowing/RichProcessWindowFunction.java    | 53 ++------------------
 .../functions/InternalWindowFunctionTest.java   | 11 ++--
 ...AlignedProcessingTimeWindowOperatorTest.java |  4 +-
 .../function/ProcessAllWindowFunction.scala     |  7 ++-
 .../scala/function/ProcessWindowFunction.scala  |  6 ++-
 .../function/RichProcessAllWindowFunction.scala | 53 +-------------------
 .../function/RichProcessWindowFunction.scala    | 53 +-------------------
 .../ScalaProcessWindowFunctionWrapper.scala     | 20 +++-----
 ...ngIdentityRichProcessAllWindowFunction.scala |  4 +-
 ...ckingIdentityRichProcessWindowFunction.scala |  4 +-
 17 files changed, 45 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
index 8982c71..1d39252 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -45,7 +45,7 @@ import org.apache.flink.util.Collector;
 @Internal
 @Deprecated
 public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
-       extends RichProcessAllWindowFunction<T, R, W>
+       extends ProcessAllWindowFunction<T, R, W>
        implements OutputTypeConfigurable<R> {
 
        private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
index 0e0356a..fa4fe86 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -45,7 +45,7 @@ import org.apache.flink.util.Collector;
 @Internal
 @Deprecated
 public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
-       extends RichProcessWindowFunction<T, R, K, W>
+       extends ProcessWindowFunction<T, R, K, W>
        implements OutputTypeConfigurable<R> {
 
        private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
index 4d247a7..34a37bf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
  * @param <W> The type of {@code Window} that this window function can be 
applied on.
  */
 @PublicEvolving
-public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> 
implements Function {
+public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> 
extends AbstractRichFunction {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
index 2c80e9e..506b610 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -34,7 +34,7 @@ import org.apache.flink.util.Collector;
  * @param <W> The type of {@code Window} that this window function can be 
applied on.
  */
 @PublicEvolving
-public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> 
implements Function {
+public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> 
extends AbstractRichFunction {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
index d1f9ccd..e7e6609 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
@@ -32,8 +32,7 @@ import org.apache.flink.util.Collector;
  * {@link ReduceFunction}.
  */
 @Internal
-public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R>
-       extends RichProcessAllWindowFunction<T, R, W> {
+public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R> 
extends ProcessAllWindowFunction<T, R, W> {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
index 836726d..18037b7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
  */
 @Internal
 public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
-       extends RichProcessWindowFunction<T, R, K, W> {
+       extends ProcessWindowFunction<T, R, K, W> {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
index 1130fa5..a800870 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
@@ -19,10 +19,6 @@
 package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 /**
@@ -32,53 +28,12 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
  * @param <IN> The type of the input value.
  * @param <OUT> The type of the output value.
  * @param <W> The type of {@code Window} that this window function can be 
applied on.
+ *
+ * @deprecated use {@link ProcessAllWindowFunction} instead
  */
 @PublicEvolving
-public abstract class RichProcessAllWindowFunction<IN, OUT, W extends Window>
-               extends ProcessAllWindowFunction<IN, OUT, W>
-               implements RichFunction {
+@Deprecated
+public abstract class RichProcessAllWindowFunction<IN, OUT, W extends Window> 
extends ProcessAllWindowFunction<IN, OUT, W> {
 
        private static final long serialVersionUID = 1L;
-
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Runtime context access
-       // 
--------------------------------------------------------------------------------------------
-
-       private transient RuntimeContext runtimeContext;
-
-       @Override
-       public void setRuntimeContext(RuntimeContext t) {
-               this.runtimeContext = t;
-       }
-
-       @Override
-       public RuntimeContext getRuntimeContext() {
-               if (this.runtimeContext != null) {
-                       return this.runtimeContext;
-               } else {
-                       throw new IllegalStateException("The runtime context 
has not been initialized.");
-               }
-       }
-
-       @Override
-       public IterationRuntimeContext getIterationRuntimeContext() {
-               if (this.runtimeContext == null) {
-                       throw new IllegalStateException("The runtime context 
has not been initialized.");
-               } else if (this.runtimeContext instanceof 
IterationRuntimeContext) {
-                       return (IterationRuntimeContext) this.runtimeContext;
-               } else {
-                       throw new IllegalStateException("This stub is not part 
of an iteration step function.");
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Default life cycle methods
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void open(Configuration parameters) throws Exception {}
-
-       @Override
-       public void close() throws Exception {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
index ac55bc6..83da065 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
@@ -19,10 +19,6 @@
 package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 /**
@@ -33,53 +29,12 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
  * @param <OUT> The type of the output value.
  * @param <KEY> The type of the key.
  * @param <W> The type of {@code Window} that this window function can be 
applied on.
+ *
+ * @deprecated use {@link ProcessWindowFunction} instead
  */
 @PublicEvolving
-public abstract class RichProcessWindowFunction<IN, OUT, KEY, W extends Window>
-               extends ProcessWindowFunction<IN, OUT, KEY, W>
-               implements RichFunction {
+@Deprecated
+public abstract class RichProcessWindowFunction<IN, OUT, KEY, W extends 
Window> extends ProcessWindowFunction<IN, OUT, KEY, W> {
 
        private static final long serialVersionUID = 1L;
-
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Runtime context access
-       // 
--------------------------------------------------------------------------------------------
-
-       private transient RuntimeContext runtimeContext;
-
-       @Override
-       public void setRuntimeContext(RuntimeContext t) {
-               this.runtimeContext = t;
-       }
-
-       @Override
-       public RuntimeContext getRuntimeContext() {
-               if (this.runtimeContext != null) {
-                       return this.runtimeContext;
-               } else {
-                       throw new IllegalStateException("The runtime context 
has not been initialized.");
-               }
-       }
-
-       @Override
-       public IterationRuntimeContext getIterationRuntimeContext() {
-               if (this.runtimeContext == null) {
-                       throw new IllegalStateException("The runtime context 
has not been initialized.");
-               } else if (this.runtimeContext instanceof 
IterationRuntimeContext) {
-                       return (IterationRuntimeContext) this.runtimeContext;
-               } else {
-                       throw new IllegalStateException("This stub is not part 
of an iteration step function.");
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Default life cycle methods
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void open(Configuration parameters) throws Exception {}
-
-       @Override
-       public void close() throws Exception {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index 4b8057f..4b0f5ab 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -24,11 +24,10 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
 import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
-import 
org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction;
-import 
org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -612,7 +611,7 @@ public class InternalWindowFunctionTest {
        }
 
        public static class ProcessWindowFunctionMock
-               extends RichProcessWindowFunction<Long, String, Long, 
TimeWindow>
+               extends ProcessWindowFunction<Long, String, Long, TimeWindow>
                implements OutputTypeConfigurable<String> {
 
                private static final long serialVersionUID = 1L;
@@ -626,7 +625,7 @@ public class InternalWindowFunctionTest {
        }
 
        public static class AggregateProcessWindowFunctionMock
-                       extends RichProcessWindowFunction<Map<Long, Long>, 
String, Long, TimeWindow>
+                       extends ProcessWindowFunction<Map<Long, Long>, String, 
Long, TimeWindow>
                        implements OutputTypeConfigurable<String> {
 
                private static final long serialVersionUID = 1L;
@@ -640,7 +639,7 @@ public class InternalWindowFunctionTest {
        }
 
        public static class AggregateProcessAllWindowFunctionMock
-                       extends RichProcessAllWindowFunction<Map<Long, Long>, 
String, TimeWindow>
+                       extends ProcessAllWindowFunction<Map<Long, Long>, 
String, TimeWindow>
                        implements OutputTypeConfigurable<String> {
 
                private static final long serialVersionUID = 1L;
@@ -679,7 +678,7 @@ public class InternalWindowFunctionTest {
        }
 
        public static class ProcessAllWindowFunctionMock
-               extends RichProcessAllWindowFunction<Long, String, TimeWindow>
+               extends ProcessAllWindowFunction<Long, String, TimeWindow>
                implements OutputTypeConfigurable<String> {
 
                private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index a8d3154..2f7e302 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -35,9 +35,7 @@ import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import 
org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
@@ -1038,7 +1036,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        // 
------------------------------------------------------------------------
 
-       private static class StatefulFunction extends 
RichProcessWindowFunction<Integer, Integer, Integer, TimeWindow> {
+       private static class StatefulFunction extends 
ProcessWindowFunction<Integer, Integer, Integer, TimeWindow> {
 
                // we use a concurrent map here even though there is no 
concurrency, to
                // get "volatile" style access to entries

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
index 2f0e48e..49911e4 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
@@ -18,10 +18,8 @@
 
 package org.apache.flink.streaming.api.scala.function
 
-import java.io.Serializable
-
 import org.apache.flink.annotation.PublicEvolving
-import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.functions.AbstractRichFunction
 import org.apache.flink.api.common.state.KeyedStateStore
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
@@ -35,7 +33,8 @@ import org.apache.flink.util.Collector
   * @tparam W The type of the window.
   */
 @PublicEvolving
-abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function 
with Serializable {
+abstract class ProcessAllWindowFunction[IN, OUT, W <: Window]
+    extends AbstractRichFunction {
   /**
     * Evaluates the window and outputs none or several elements.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
index bc79a26..d2075db 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala.function
 import java.io.Serializable
 
 import org.apache.flink.annotation.PublicEvolving
-import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.functions.AbstractRichFunction
 import org.apache.flink.api.common.state.KeyedStateStore
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
@@ -36,7 +36,9 @@ import org.apache.flink.util.Collector
   * @tparam W The type of the window.
   */
 @PublicEvolving
-abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends 
Function with Serializable {
+abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window]
+    extends AbstractRichFunction {
+
   /**
     * Evaluates the window and outputs none or several elements.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
index 22d64a8..6edc1e6 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
@@ -18,11 +18,7 @@
 
 package org.apache.flink.streaming.api.scala.function
 
-import java.beans.Transient
-
 import org.apache.flink.annotation.Public
-import org.apache.flink.api.common.functions.{IterationRuntimeContext, 
RichFunction, RuntimeContext}
-import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.Window
 
 /**
@@ -34,53 +30,8 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window
   * @tparam W The type of the window.
   */
 @Public
+@deprecated("use [[ProcessAllWindowFunction]] instead")
 abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window]
-    extends ProcessAllWindowFunction[IN, OUT, W]
-    with RichFunction {
-
-  @Transient
-  private var runtimeContext: RuntimeContext = null
-
-  // 
--------------------------------------------------------------------------------------------
-  //  Runtime context access
-  // 
--------------------------------------------------------------------------------------------
-
-  override def setRuntimeContext(t: RuntimeContext) {
-    this.runtimeContext = t
-  }
-
-  override def getRuntimeContext: RuntimeContext = {
-    if (this.runtimeContext != null) {
-      this.runtimeContext
-    }
-    else {
-      throw new IllegalStateException("The runtime context has not been 
initialized.")
-    }
-  }
-
-  override def getIterationRuntimeContext: IterationRuntimeContext = {
-    if (this.runtimeContext == null) {
-      throw new IllegalStateException("The runtime context has not been 
initialized.")
-    }
-    else {
-      this.runtimeContext match {
-        case iterationRuntimeContext: IterationRuntimeContext => 
iterationRuntimeContext
-        case _ =>
-          throw new IllegalStateException("This stub is not part of an 
iteration step function.")
-      }
-    }
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //  Default life cycle methods
-  // 
--------------------------------------------------------------------------------------------
-
-  @throws[Exception]
-  override def open(parameters: Configuration) {
-  }
-
-  @throws[Exception]
-  override def close() {
-  }
+    extends ProcessAllWindowFunction[IN, OUT, W] {
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
index 320685a..d9cd275 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
@@ -18,11 +18,7 @@
 
 package org.apache.flink.streaming.api.scala.function
 
-import java.beans.Transient
-
 import org.apache.flink.annotation.Public
-import org.apache.flink.api.common.functions.{IterationRuntimeContext, 
RichFunction, RuntimeContext}
-import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.Window
 
 /**
@@ -35,53 +31,8 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window
   * @tparam W The type of the window.
   */
 @Public
+@deprecated("use [[ProcessWindowFunction]] instead")
 abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window]
-    extends ProcessWindowFunction[IN, OUT, KEY, W]
-    with RichFunction {
-
-  @Transient
-  private var runtimeContext: RuntimeContext = null
-
-  // 
--------------------------------------------------------------------------------------------
-  //  Runtime context access
-  // 
--------------------------------------------------------------------------------------------
-
-  override def setRuntimeContext(t: RuntimeContext) {
-    this.runtimeContext = t
-  }
-
-  override def getRuntimeContext: RuntimeContext = {
-    if (this.runtimeContext != null) {
-      this.runtimeContext
-    }
-    else {
-      throw new IllegalStateException("The runtime context has not been 
initialized.")
-    }
-  }
-
-  override def getIterationRuntimeContext: IterationRuntimeContext = {
-    if (this.runtimeContext == null) {
-      throw new IllegalStateException("The runtime context has not been 
initialized.")
-    }
-    else {
-      this.runtimeContext match {
-        case iterationRuntimeContext: IterationRuntimeContext => 
iterationRuntimeContext
-        case _ =>
-          throw new IllegalStateException("This stub is not part of an 
iteration step function.")
-      }
-    }
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //  Default life cycle methods
-  // 
--------------------------------------------------------------------------------------------
-  
-  @throws[Exception]
-  override def open(parameters: Configuration) {
-  }
-
-  @throws[Exception]
-  override def close() {
-  }
+    extends ProcessWindowFunction[IN, OUT, KEY, W] {
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index 263373e..bc4b7dd 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -21,13 +21,9 @@ package org.apache.flink.streaming.api.scala.function.util
 import org.apache.flink.api.common.functions.RuntimeContext
 import org.apache.flink.configuration.Configuration
 import 
org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => 
JProcessWindowFunction}
-import 
org.apache.flink.streaming.api.functions.windowing.{RichProcessWindowFunction 
=> JRichProcessWindowFunction}
-import 
org.apache.flink.streaming.api.functions.windowing.{RichProcessAllWindowFunction
 => JRichProcessAllWindowFunction}
 import 
org.apache.flink.streaming.api.functions.windowing.{ProcessAllWindowFunction => 
JProcessAllWindowFunction}
 import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction => 
ScalaProcessWindowFunction}
 import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction 
=> ScalaProcessAllWindowFunction}
-import 
org.apache.flink.streaming.api.scala.function.{RichProcessWindowFunction => 
ScalaRichProcessWindowFunction}
-import 
org.apache.flink.streaming.api.scala.function.{RichProcessAllWindowFunction => 
ScalaRichProcessAllWindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
@@ -43,7 +39,7 @@ import scala.collection.JavaConverters._
   */
 final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
     private[this] val func: ScalaProcessWindowFunction[IN, OUT, KEY, W])
-    extends JRichProcessWindowFunction[IN, OUT, KEY, W] {
+    extends JProcessWindowFunction[IN, OUT, KEY, W] {
 
   override def process(
       key: KEY,
@@ -82,7 +78,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W 
<: Window](
   override def setRuntimeContext(t: RuntimeContext): Unit = {
     super.setRuntimeContext(t)
     func match {
-      case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => 
rfunc.setRuntimeContext(t)
+      case rfunc: ScalaProcessWindowFunction[IN, OUT, KEY, W] => 
rfunc.setRuntimeContext(t)
       case _ =>
     }
   }
@@ -90,7 +86,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W 
<: Window](
   override def open(parameters: Configuration): Unit = {
     super.open(parameters)
     func match {
-      case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => 
rfunc.open(parameters)
+      case rfunc: ScalaProcessWindowFunction[IN, OUT, KEY, W] => 
rfunc.open(parameters)
       case _ =>
     }
   }
@@ -98,7 +94,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W 
<: Window](
   override def close(): Unit = {
     super.close()
     func match {
-      case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => 
rfunc.close()
+      case rfunc: ScalaProcessWindowFunction[IN, OUT, KEY, W] => rfunc.close()
       case _ =>
     }
   }
@@ -114,7 +110,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, 
W <: Window](
   */
 final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
     private[this] val func: ScalaProcessAllWindowFunction[IN, OUT, W])
-    extends JRichProcessAllWindowFunction[IN, OUT, W] {
+    extends JProcessAllWindowFunction[IN, OUT, W] {
 
   override def process(
       context: JProcessAllWindowFunction[IN, OUT, W]#Context,
@@ -145,7 +141,7 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W 
<: Window](
   override def setRuntimeContext(t: RuntimeContext): Unit = {
     super.setRuntimeContext(t)
     func match {
-      case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => 
rfunc.setRuntimeContext(t)
+      case rfunc : ScalaProcessAllWindowFunction[IN, OUT, W] => 
rfunc.setRuntimeContext(t)
       case _ =>
     }
   }
@@ -153,7 +149,7 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W 
<: Window](
   override def open(parameters: Configuration): Unit = {
     super.open(parameters)
     func match {
-      case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => 
rfunc.open(parameters)
+      case rfunc : ScalaProcessAllWindowFunction[IN, OUT, W] => 
rfunc.open(parameters)
       case _ =>
     }
   }
@@ -161,7 +157,7 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W 
<: Window](
   override def close(): Unit = {
     super.close()
     func match {
-      case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => 
rfunc.close()
+      case rfunc : ScalaProcessAllWindowFunction[IN, OUT, W] => rfunc.close()
       case _ =>
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
index df005fa..146452b 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
@@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.scala.testutils
 
 import org.apache.flink.api.common.functions.RuntimeContext
 import org.apache.flink.configuration.Configuration
-import 
org.apache.flink.streaming.api.scala.function.RichProcessAllWindowFunction
+import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
 
 class CheckingIdentityRichProcessAllWindowFunction[T, W <: Window]
-  extends RichProcessAllWindowFunction[T, T, W] {
+  extends ProcessAllWindowFunction[T, T, W] {
 
   override def process(context: Context, input: Iterable[T], out: 
Collector[T]): Unit = {
     for (value <- input) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e76a0aa9/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
index d62f2d3..2ec179a 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
@@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.scala.testutils
 
 import org.apache.flink.api.common.functions.RuntimeContext
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.scala.function.RichProcessWindowFunction
+import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
 
 class CheckingIdentityRichProcessWindowFunction[T, K, W <: Window]
-  extends RichProcessWindowFunction[T, T, K, W] {
+  extends ProcessWindowFunction[T, T, K, W] {
 
   override def process(key: K, context: Context, input: Iterable[T], out: 
Collector[T]): Unit = {
     for (value <- input) {

Reply via email to