Repository: flink Updated Branches: refs/heads/master 6a86e9d62 -> 8492d9b7b
[FLINK-5250] Unwrap WrappingFunction in AbstractStreamOperator.setOutputType() This makes InternalWindowFunction subclasses WrappingFunctions and correctly forwards calls to setOutputType() by unwrapping WrappingFunction in the newly added StreamingFunctionUtils. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce023503 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce023503 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce023503 Branch: refs/heads/master Commit: ce023503f6f9601c80d41766c6d59836bcb0abb6 Parents: 6a86e9d Author: Aljoscha Krettek <[email protected]> Authored: Fri Jan 13 12:09:58 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Jan 13 22:47:24 2017 +0100 ---------------------------------------------------------------------- .../functions/util/StreamingFunctionUtils.java | 95 ++++++++++++++++++++ .../flink/streaming/api/graph/StreamGraph.java | 2 +- .../operators/AbstractUdfStreamOperator.java | 7 +- .../InternalIterableAllWindowFunction.java | 18 +--- .../InternalIterableWindowFunction.java | 18 +--- .../InternalSingleValueAllWindowFunction.java | 18 +--- .../InternalSingleValueWindowFunction.java | 18 +--- .../functions/InternalWindowFunction.java | 17 ++-- .../functions/InternalWindowFunctionTest.java | 15 +++- 9 files changed, 130 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java new file mode 100644 index 0000000..f167f7f --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.runtime.state.DefaultOperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility class that contains helper methods to work with Flink Streaming + * {@link Function Functions}. This is similar to + * {@link org.apache.flink.api.common.functions.util.FunctionUtils} but has additional methods + * for invoking interfaces that only exist in the streaming API. + */ +@Internal +public final class StreamingFunctionUtils { + + @SuppressWarnings("unchecked") + public static <T> void setOutputType( + Function userFunction, + TypeInformation<T> outTypeInfo, + ExecutionConfig executionConfig) { + + Preconditions.checkNotNull(outTypeInfo); + Preconditions.checkNotNull(executionConfig); + + while (true) { + if (trySetOutputType(userFunction, outTypeInfo, executionConfig)) { + break; + } + + // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function + if (userFunction instanceof WrappingFunction) { + userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction(); + } else { + break; + } + + } + } + + @SuppressWarnings("unchecked") + private static <T> boolean trySetOutputType( + Function userFunction, + TypeInformation<T> outTypeInfo, + ExecutionConfig executionConfig) { + + Preconditions.checkNotNull(outTypeInfo); + Preconditions.checkNotNull(executionConfig); + + if (OutputTypeConfigurable.class.isAssignableFrom(userFunction.getClass())) { + ((OutputTypeConfigurable<T>) userFunction).setOutputType(outTypeInfo, executionConfig); + return true; + } + return false; + } + + /** + * Private constructor to prevent instantiation. + */ + private StreamingFunctionUtils() { + throw new RuntimeException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index c946e98..29a5ea5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -202,7 +202,7 @@ public class StreamGraph extends StreamingPlan { setSerializers(vertexID, inSerializer, null, outSerializer); - if (operatorObject instanceof OutputTypeConfigurable) { + if (operatorObject instanceof OutputTypeConfigurable && outTypeInfo != null) { @SuppressWarnings("unchecked") OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) operatorObject; // sets the output type which must be know at StreamGraph creation time http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 15e26c9..9f67156 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -241,11 +242,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> @Override public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) { - if (userFunction instanceof OutputTypeConfigurable) { - @SuppressWarnings("unchecked") - OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) userFunction; - outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig); - } + StreamingFunctionUtils.setOutputType(userFunction, outTypeInfo, executionConfig); } http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java index 522d3ec..b2adc94 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java @@ -17,15 +17,12 @@ */ package org.apache.flink.streaming.runtime.operators.windowing.functions; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.IterationRuntimeContext; -import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; -import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @@ -34,14 +31,15 @@ import org.apache.flink.util.Collector; * when the window state also is an {@code Iterable}. */ public final class InternalIterableAllWindowFunction<IN, OUT, W extends Window> - extends InternalWindowFunction<Iterable<IN>, OUT, Byte, W> - implements RichFunction { + extends WrappingFunction<AllWindowFunction<IN, OUT, W>> + implements InternalWindowFunction<Iterable<IN>, OUT, Byte, W> { private static final long serialVersionUID = 1L; protected final AllWindowFunction<IN, OUT, W> wrappedFunction; public InternalIterableAllWindowFunction(AllWindowFunction<IN, OUT, W> wrappedFunction) { + super(wrappedFunction); this.wrappedFunction = wrappedFunction; } @@ -75,12 +73,4 @@ public final class InternalIterableAllWindowFunction<IN, OUT, W extends Window> throw new RuntimeException("This should never be called."); } - - @SuppressWarnings("unchecked") - @Override - public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) { - if (OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass())) { - ((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java index 2598557..821d40a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java @@ -17,15 +17,12 @@ */ package org.apache.flink.streaming.runtime.operators.windowing.functions; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.IterationRuntimeContext; -import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; -import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @@ -34,14 +31,15 @@ import org.apache.flink.util.Collector; * when the window state also is an {@code Iterable}. */ public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window> - extends InternalWindowFunction<Iterable<IN>, OUT, KEY, W> - implements RichFunction { + extends WrappingFunction<WindowFunction<IN, OUT, KEY, W>> + implements InternalWindowFunction<Iterable<IN>, OUT, KEY, W> { private static final long serialVersionUID = 1L; protected final WindowFunction<IN, OUT, KEY, W> wrappedFunction; public InternalIterableWindowFunction(WindowFunction<IN, OUT, KEY, W> wrappedFunction) { + super(wrappedFunction); this.wrappedFunction = wrappedFunction; } @@ -75,12 +73,4 @@ public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window throw new RuntimeException("This should never be called."); } - - @SuppressWarnings("unchecked") - @Override - public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) { - if (OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass())) { - ((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java index 6db711f..7cdc31c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java @@ -17,15 +17,12 @@ */ package org.apache.flink.streaming.runtime.operators.windowing.functions; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.IterationRuntimeContext; -import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; -import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @@ -36,14 +33,15 @@ import java.util.Collections; * when the window state is a single value. */ public final class InternalSingleValueAllWindowFunction<IN, OUT, W extends Window> - extends InternalWindowFunction<IN, OUT, Byte, W> - implements RichFunction { + extends WrappingFunction<AllWindowFunction<IN, OUT, W>> + implements InternalWindowFunction<IN, OUT, Byte, W> { private static final long serialVersionUID = 1L; protected AllWindowFunction<IN, OUT, W> wrappedFunction; public InternalSingleValueAllWindowFunction(AllWindowFunction<IN, OUT, W> wrappedFunction) { + super(wrappedFunction); this.wrappedFunction = wrappedFunction; } @@ -77,12 +75,4 @@ public final class InternalSingleValueAllWindowFunction<IN, OUT, W extends Windo throw new RuntimeException("This should never be called."); } - - @SuppressWarnings("unchecked") - @Override - public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) { - if (OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass())) { - ((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java index 727f2e4..e98872b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java @@ -17,15 +17,12 @@ */ package org.apache.flink.streaming.runtime.operators.windowing.functions; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.IterationRuntimeContext; -import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; -import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @@ -36,14 +33,15 @@ import java.util.Collections; * when the window state is a single value. */ public final class InternalSingleValueWindowFunction<IN, OUT, KEY, W extends Window> - extends InternalWindowFunction<IN, OUT, KEY, W> - implements RichFunction { + extends WrappingFunction<WindowFunction<IN, OUT, KEY, W>> + implements InternalWindowFunction<IN, OUT, KEY, W> { private static final long serialVersionUID = 1L; protected WindowFunction<IN, OUT, KEY, W> wrappedFunction; public InternalSingleValueWindowFunction(WindowFunction<IN, OUT, KEY, W> wrappedFunction) { + super(wrappedFunction); this.wrappedFunction = wrappedFunction; } @@ -77,12 +75,4 @@ public final class InternalSingleValueWindowFunction<IN, OUT, KEY, W extends Win throw new RuntimeException("This should never be called."); } - - @SuppressWarnings("unchecked") - @Override - public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) { - if (OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass())) { - ((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java index a7d18de..2eb4052 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java @@ -18,12 +18,9 @@ package org.apache.flink.streaming.runtime.operators.windowing.functions; import org.apache.flink.api.common.functions.Function; -import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; -import java.io.Serializable; - /** * Internal interface for functions that are evaluated over keyed (grouped) windows. * @@ -31,20 +28,16 @@ import java.io.Serializable; * @param <OUT> The type of the output value. * @param <KEY> The type of the key. */ -public abstract class InternalWindowFunction<IN, OUT, KEY, W extends Window> - implements Function, Serializable, OutputTypeConfigurable<OUT> { - - private static final long serialVersionUID = 1L; +public interface InternalWindowFunction<IN, OUT, KEY, W extends Window> extends Function { /** * Evaluates the window and outputs none or several elements. * - * @param key The key for which this window is evaluated. + * @param key The key for which this window is evaluated. * @param window The window that is being evaluated. - * @param input The elements in the window being evaluated. - * @param out A collector for emitting elements. - * + * @param input The elements in the window being evaluated. + * @param out A collector for emitting elements. * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ - public abstract void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception; + void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/ce023503/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java index fae2cd0..f3c3423 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils; import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; @@ -52,7 +53,7 @@ public class InternalWindowFunctionTest { ExecutionConfig execConf = new ExecutionConfig(); execConf.setParallelism(42); - windowFunction.setOutputType(stringType, execConf); + StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf); verify(mock).setOutputType(stringType, execConf); // check open @@ -93,7 +94,7 @@ public class InternalWindowFunctionTest { ExecutionConfig execConf = new ExecutionConfig(); execConf.setParallelism(42); - windowFunction.setOutputType(stringType, execConf); + StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf); verify(mock).setOutputType(stringType, execConf); // check open @@ -134,7 +135,8 @@ public class InternalWindowFunctionTest { ExecutionConfig execConf = new ExecutionConfig(); execConf.setParallelism(42); - windowFunction.setOutputType(stringType, execConf); + StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf); + verify(mock).setOutputType(stringType, execConf); // check open @@ -174,7 +176,8 @@ public class InternalWindowFunctionTest { ExecutionConfig execConf = new ExecutionConfig(); execConf.setParallelism(42); - windowFunction.setOutputType(stringType, execConf); + StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf); + verify(mock).setOutputType(stringType, execConf); // check open @@ -205,6 +208,8 @@ public class InternalWindowFunctionTest { extends RichWindowFunction<Long, String, Long, TimeWindow> implements OutputTypeConfigurable<String> { + private static final long serialVersionUID = 1L; + @Override public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { } @@ -216,6 +221,8 @@ public class InternalWindowFunctionTest { extends RichAllWindowFunction<Long, String, TimeWindow> implements OutputTypeConfigurable<String> { + private static final long serialVersionUID = 1L; + @Override public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
