Repository: flink
Updated Branches:
  refs/heads/master 6a86e9d62 -> 8492d9b7b


[FLINK-5250] Unwrap WrappingFunction in AbstractStreamOperator.setOutputType()

This makes InternalWindowFunction subclasses WrappingFunctions and
correctly forwards calls to setOutputType() by unwrapping
WrappingFunction in the newly added StreamingFunctionUtils.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce023503
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce023503
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce023503

Branch: refs/heads/master
Commit: ce023503f6f9601c80d41766c6d59836bcb0abb6
Parents: 6a86e9d
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Jan 13 12:09:58 2017 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Jan 13 22:47:24 2017 +0100

----------------------------------------------------------------------
 .../functions/util/StreamingFunctionUtils.java  | 95 ++++++++++++++++++++
 .../flink/streaming/api/graph/StreamGraph.java  |  2 +-
 .../operators/AbstractUdfStreamOperator.java    |  7 +-
 .../InternalIterableAllWindowFunction.java      | 18 +---
 .../InternalIterableWindowFunction.java         | 18 +---
 .../InternalSingleValueAllWindowFunction.java   | 18 +---
 .../InternalSingleValueWindowFunction.java      | 18 +---
 .../functions/InternalWindowFunction.java       | 17 ++--
 .../functions/InternalWindowFunctionTest.java   | 15 +++-
 9 files changed, 130 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
new file mode 100644
index 0000000..f167f7f
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility class that contains helper methods to work with Flink Streaming
+ * {@link Function Functions}. This is similar to
+ * {@link org.apache.flink.api.common.functions.util.FunctionUtils} but has 
additional methods
+ * for invoking interfaces that only exist in the streaming API.
+ */
+@Internal
+public final class StreamingFunctionUtils {
+
+       @SuppressWarnings("unchecked")
+       public static <T> void setOutputType(
+                       Function userFunction,
+                       TypeInformation<T> outTypeInfo,
+                       ExecutionConfig executionConfig) {
+
+               Preconditions.checkNotNull(outTypeInfo);
+               Preconditions.checkNotNull(executionConfig);
+
+               while (true) {
+                       if (trySetOutputType(userFunction, outTypeInfo, 
executionConfig)) {
+                               break;
+                       }
+
+                       // inspect if the user function is wrapped, then unwrap 
and try again if we can snapshot the inner function
+                       if (userFunction instanceof WrappingFunction) {
+                               userFunction = ((WrappingFunction<?>) 
userFunction).getWrappedFunction();
+                       } else {
+                               break;
+                       }
+
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private static  <T> boolean trySetOutputType(
+                       Function userFunction,
+                       TypeInformation<T> outTypeInfo,
+                       ExecutionConfig executionConfig) {
+
+               Preconditions.checkNotNull(outTypeInfo);
+               Preconditions.checkNotNull(executionConfig);
+
+               if 
(OutputTypeConfigurable.class.isAssignableFrom(userFunction.getClass())) {
+                       ((OutputTypeConfigurable<T>) 
userFunction).setOutputType(outTypeInfo, executionConfig);
+                       return true;
+               }
+               return false;
+       }
+
+       /**
+        * Private constructor to prevent instantiation.
+        */
+       private StreamingFunctionUtils() {
+               throw new RuntimeException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index c946e98..29a5ea5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -202,7 +202,7 @@ public class StreamGraph extends StreamingPlan {
 
                setSerializers(vertexID, inSerializer, null, outSerializer);
 
-               if (operatorObject instanceof OutputTypeConfigurable) {
+               if (operatorObject instanceof OutputTypeConfigurable && 
outTypeInfo != null) {
                        @SuppressWarnings("unchecked")
                        OutputTypeConfigurable<OUT> outputTypeConfigurable = 
(OutputTypeConfigurable<OUT>) operatorObject;
                        // sets the output type which must be know at 
StreamGraph creation time

http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 15e26c9..9f67156 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -241,11 +242,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
 
        @Override
        public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
-               if (userFunction instanceof OutputTypeConfigurable) {
-                       @SuppressWarnings("unchecked")
-                       OutputTypeConfigurable<OUT> outputTypeConfigurable = 
(OutputTypeConfigurable<OUT>) userFunction;
-                       outputTypeConfigurable.setOutputType(outTypeInfo, 
executionConfig);
-               }
+               StreamingFunctionUtils.setOutputType(userFunction, outTypeInfo, 
executionConfig);
        }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
index 522d3ec..b2adc94 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
@@ -17,15 +17,12 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -34,14 +31,15 @@ import org.apache.flink.util.Collector;
  * when the window state also is an {@code Iterable}.
  */
 public final class InternalIterableAllWindowFunction<IN, OUT, W extends Window>
-               extends InternalWindowFunction<Iterable<IN>, OUT, Byte, W>
-               implements RichFunction {
+               extends WrappingFunction<AllWindowFunction<IN, OUT, W>>
+               implements InternalWindowFunction<Iterable<IN>, OUT, Byte, W> {
 
        private static final long serialVersionUID = 1L;
 
        protected final AllWindowFunction<IN, OUT, W> wrappedFunction;
 
        public InternalIterableAllWindowFunction(AllWindowFunction<IN, OUT, W> 
wrappedFunction) {
+               super(wrappedFunction);
                this.wrappedFunction = wrappedFunction;
        }
 
@@ -75,12 +73,4 @@ public final class InternalIterableAllWindowFunction<IN, 
OUT, W extends Window>
                throw new RuntimeException("This should never be called.");
 
        }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
-               if 
(OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass()))
 {
-                       
((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, 
executionConfig);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
index 2598557..821d40a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
@@ -17,15 +17,12 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -34,14 +31,15 @@ import org.apache.flink.util.Collector;
  * when the window state also is an {@code Iterable}.
  */
 public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends 
Window>
-               extends InternalWindowFunction<Iterable<IN>, OUT, KEY, W>
-               implements RichFunction {
+               extends WrappingFunction<WindowFunction<IN, OUT, KEY, W>>
+               implements InternalWindowFunction<Iterable<IN>, OUT, KEY, W> {
 
        private static final long serialVersionUID = 1L;
 
        protected final WindowFunction<IN, OUT, KEY, W> wrappedFunction;
 
        public InternalIterableWindowFunction(WindowFunction<IN, OUT, KEY, W> 
wrappedFunction) {
+               super(wrappedFunction);
                this.wrappedFunction = wrappedFunction;
        }
 
@@ -75,12 +73,4 @@ public final class InternalIterableWindowFunction<IN, OUT, 
KEY, W extends Window
                throw new RuntimeException("This should never be called.");
 
        }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
-               if 
(OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass()))
 {
-                       
((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, 
executionConfig);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
index 6db711f..7cdc31c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
@@ -17,15 +17,12 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -36,14 +33,15 @@ import java.util.Collections;
  * when the window state is a single value.
  */
 public final class InternalSingleValueAllWindowFunction<IN, OUT, W extends 
Window>
-               extends InternalWindowFunction<IN, OUT, Byte, W>
-               implements RichFunction {
+               extends WrappingFunction<AllWindowFunction<IN, OUT, W>>
+               implements InternalWindowFunction<IN, OUT, Byte, W> {
 
        private static final long serialVersionUID = 1L;
 
        protected AllWindowFunction<IN, OUT, W> wrappedFunction;
 
        public InternalSingleValueAllWindowFunction(AllWindowFunction<IN, OUT, 
W> wrappedFunction) {
+               super(wrappedFunction);
                this.wrappedFunction = wrappedFunction;
        }
 
@@ -77,12 +75,4 @@ public final class InternalSingleValueAllWindowFunction<IN, 
OUT, W extends Windo
                throw new RuntimeException("This should never be called.");
 
        }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
-               if 
(OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass()))
 {
-                       
((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, 
executionConfig);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
index 727f2e4..e98872b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
@@ -17,15 +17,12 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -36,14 +33,15 @@ import java.util.Collections;
  * when the window state is a single value.
  */
 public final class InternalSingleValueWindowFunction<IN, OUT, KEY, W extends 
Window>
-               extends InternalWindowFunction<IN, OUT, KEY, W>
-               implements RichFunction {
+               extends WrappingFunction<WindowFunction<IN, OUT, KEY, W>>
+               implements InternalWindowFunction<IN, OUT, KEY, W> {
 
        private static final long serialVersionUID = 1L;
 
        protected WindowFunction<IN, OUT, KEY, W> wrappedFunction;
 
        public InternalSingleValueWindowFunction(WindowFunction<IN, OUT, KEY, 
W> wrappedFunction) {
+               super(wrappedFunction);
                this.wrappedFunction = wrappedFunction;
        }
 
@@ -77,12 +75,4 @@ public final class InternalSingleValueWindowFunction<IN, 
OUT, KEY, W extends Win
                throw new RuntimeException("This should never be called.");
 
        }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
-               if 
(OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass()))
 {
-                       
((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, 
executionConfig);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
index a7d18de..2eb4052 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
@@ -18,12 +18,9 @@
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
-import java.io.Serializable;
-
 /**
  * Internal interface for functions that are evaluated over keyed (grouped) 
windows.
  *
@@ -31,20 +28,16 @@ import java.io.Serializable;
  * @param <OUT> The type of the output value.
  * @param <KEY> The type of the key.
  */
-public abstract class InternalWindowFunction<IN, OUT, KEY, W extends Window>
-               implements Function, Serializable, OutputTypeConfigurable<OUT> {
-
-       private static final long serialVersionUID = 1L;
+public interface InternalWindowFunction<IN, OUT, KEY, W extends Window> 
extends Function {
 
        /**
         * Evaluates the window and outputs none or several elements.
         *
-        * @param key The key for which this window is evaluated.
+        * @param key    The key for which this window is evaluated.
         * @param window The window that is being evaluated.
-        * @param input The elements in the window being evaluated.
-        * @param out A collector for emitting elements.
-        *
+        * @param input  The elements in the window being evaluated.
+        * @param out    A collector for emitting elements.
         * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
         */
-       public abstract void apply(KEY key, W window, IN input, Collector<OUT> 
out) throws Exception;
+       void apply(KEY key, W window, IN input, Collector<OUT> out) throws 
Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index fae2cd0..f3c3423 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils;
 import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
@@ -52,7 +53,7 @@ public class InternalWindowFunctionTest {
                ExecutionConfig execConf = new ExecutionConfig();
                execConf.setParallelism(42);
 
-               windowFunction.setOutputType(stringType, execConf);
+               StreamingFunctionUtils.setOutputType(windowFunction, 
stringType, execConf);
                verify(mock).setOutputType(stringType, execConf);
 
                // check open
@@ -93,7 +94,7 @@ public class InternalWindowFunctionTest {
                ExecutionConfig execConf = new ExecutionConfig();
                execConf.setParallelism(42);
 
-               windowFunction.setOutputType(stringType, execConf);
+               StreamingFunctionUtils.setOutputType(windowFunction, 
stringType, execConf);
                verify(mock).setOutputType(stringType, execConf);
 
                // check open
@@ -134,7 +135,8 @@ public class InternalWindowFunctionTest {
                ExecutionConfig execConf = new ExecutionConfig();
                execConf.setParallelism(42);
 
-               windowFunction.setOutputType(stringType, execConf);
+               StreamingFunctionUtils.setOutputType(windowFunction, 
stringType, execConf);
+
                verify(mock).setOutputType(stringType, execConf);
 
                // check open
@@ -174,7 +176,8 @@ public class InternalWindowFunctionTest {
                ExecutionConfig execConf = new ExecutionConfig();
                execConf.setParallelism(42);
 
-               windowFunction.setOutputType(stringType, execConf);
+               StreamingFunctionUtils.setOutputType(windowFunction, 
stringType, execConf);
+
                verify(mock).setOutputType(stringType, execConf);
 
                // check open
@@ -205,6 +208,8 @@ public class InternalWindowFunctionTest {
                extends RichWindowFunction<Long, String, Long, TimeWindow>
                implements OutputTypeConfigurable<String> {
 
+               private static final long serialVersionUID = 1L;
+
                @Override
                public void setOutputType(TypeInformation<String> outTypeInfo, 
ExecutionConfig executionConfig) { }
 
@@ -216,6 +221,8 @@ public class InternalWindowFunctionTest {
                extends RichAllWindowFunction<Long, String, TimeWindow>
                implements OutputTypeConfigurable<String> {
 
+               private static final long serialVersionUID = 1L;
+
                @Override
                public void setOutputType(TypeInformation<String> outTypeInfo, 
ExecutionConfig executionConfig) { }
 

Reply via email to