[FLINK-3977] [dataStream] InternalWindowFunctions implement 
OutputTypeConfigurable.

- setOutputType calls are forwarded to wrapped functions.
- test added for InternalWindowFucntions.

This closes #2118


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/18744b2c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/18744b2c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/18744b2c

Branch: refs/heads/master
Commit: 18744b2c846aa51ed317c4c7409519f25e3eafb7
Parents: bce3550
Author: Fabian Hueske <[email protected]>
Authored: Thu Jun 16 09:09:40 2016 +0200
Committer: Fabian Hueske <[email protected]>
Committed: Sat Jun 18 23:40:23 2016 +0200

----------------------------------------------------------------------
 .../InternalIterableAllWindowFunction.java      |  11 +
 .../InternalIterableWindowFunction.java         |  11 +
 .../InternalSingleValueAllWindowFunction.java   |  11 +
 .../InternalSingleValueWindowFunction.java      |  11 +
 .../functions/InternalWindowFunction.java       |   5 +-
 .../functions/InternalWindowFunctionTest.java   | 225 +++++++++++++++++++
 6 files changed, 273 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/18744b2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
index 3a4be91..522d3ec 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
@@ -17,12 +17,15 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -72,4 +75,12 @@ public final class InternalIterableAllWindowFunction<IN, 
OUT, W extends Window>
                throw new RuntimeException("This should never be called.");
 
        }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
+               if 
(OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass()))
 {
+                       
((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, 
executionConfig);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/18744b2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
index 822a57c..2598557 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
@@ -17,12 +17,15 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -72,4 +75,12 @@ public final class InternalIterableWindowFunction<IN, OUT, 
KEY, W extends Window
                throw new RuntimeException("This should never be called.");
 
        }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
+               if 
(OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass()))
 {
+                       
((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, 
executionConfig);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/18744b2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
index aa6e196..6db711f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
@@ -17,12 +17,15 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -74,4 +77,12 @@ public final class InternalSingleValueAllWindowFunction<IN, 
OUT, W extends Windo
                throw new RuntimeException("This should never be called.");
 
        }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
+               if 
(OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass()))
 {
+                       
((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, 
executionConfig);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/18744b2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
index 661473d..727f2e4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
@@ -17,12 +17,15 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -74,4 +77,12 @@ public final class InternalSingleValueWindowFunction<IN, 
OUT, KEY, W extends Win
                throw new RuntimeException("This should never be called.");
 
        }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
+               if 
(OutputTypeConfigurable.class.isAssignableFrom(this.wrappedFunction.getClass()))
 {
+                       
((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, 
executionConfig);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/18744b2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
index e75f3be..a7d18de 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -30,7 +31,9 @@ import java.io.Serializable;
  * @param <OUT> The type of the output value.
  * @param <KEY> The type of the key.
  */
-public abstract class InternalWindowFunction<IN, OUT, KEY, W extends Window> 
implements Function, Serializable {
+public abstract class InternalWindowFunction<IN, OUT, KEY, W extends Window>
+               implements Function, Serializable, OutputTypeConfigurable<OUT> {
+
        private static final long serialVersionUID = 1L;
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/18744b2c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
new file mode 100644
index 0000000..fae2cd0
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.windowing.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
+import org.apache.flink.util.Collector;
+import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class InternalWindowFunctionTest {
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testInternalIterableAllWindowFunction() throws Exception {
+
+               AllWindowFunctionMock mock = mock(AllWindowFunctionMock.class);
+               InternalIterableAllWindowFunction<Long, String, TimeWindow> 
windowFunction =
+                       new InternalIterableAllWindowFunction<>(mock);
+
+               // check setOutputType
+               TypeInformation<String> stringType = 
BasicTypeInfo.STRING_TYPE_INFO;
+               ExecutionConfig execConf = new ExecutionConfig();
+               execConf.setParallelism(42);
+
+               windowFunction.setOutputType(stringType, execConf);
+               verify(mock).setOutputType(stringType, execConf);
+
+               // check open
+               Configuration config = new Configuration();
+
+               windowFunction.open(config);
+               verify(mock).open(config);
+
+               // check setRuntimeContext
+               RuntimeContext rCtx = mock(RuntimeContext.class);
+
+               windowFunction.setRuntimeContext(rCtx);
+               verify(mock).setRuntimeContext(rCtx);
+
+               // check apply
+               TimeWindow w = mock(TimeWindow.class);
+               Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
+               Collector<String> c = (Collector<String>) mock(Collector.class);
+
+               windowFunction.apply(((byte)0), w, i, c);
+               verify(mock).apply(w, i, c);
+
+               // check close
+               windowFunction.close();
+               verify(mock).close();
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testInternalIterableWindowFunction() throws Exception {
+
+               WindowFunctionMock mock = mock(WindowFunctionMock.class);
+               InternalIterableWindowFunction<Long, String, Long, TimeWindow> 
windowFunction =
+                       new InternalIterableWindowFunction<>(mock);
+
+               // check setOutputType
+               TypeInformation<String> stringType = 
BasicTypeInfo.STRING_TYPE_INFO;
+               ExecutionConfig execConf = new ExecutionConfig();
+               execConf.setParallelism(42);
+
+               windowFunction.setOutputType(stringType, execConf);
+               verify(mock).setOutputType(stringType, execConf);
+
+               // check open
+               Configuration config = new Configuration();
+
+               windowFunction.open(config);
+               verify(mock).open(config);
+
+               // check setRuntimeContext
+               RuntimeContext rCtx = mock(RuntimeContext.class);
+
+               windowFunction.setRuntimeContext(rCtx);
+               verify(mock).setRuntimeContext(rCtx);
+
+               // check apply
+               TimeWindow w = mock(TimeWindow.class);
+               Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
+               Collector<String> c = (Collector<String>) mock(Collector.class);
+
+               windowFunction.apply(42L, w, i, c);
+               verify(mock).apply(42L, w, i, c);
+
+               // check close
+               windowFunction.close();
+               verify(mock).close();
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testInternalSingleValueWindowFunction() throws Exception {
+
+               WindowFunctionMock mock = mock(WindowFunctionMock.class);
+               InternalSingleValueWindowFunction<Long, String, Long, 
TimeWindow> windowFunction =
+                       new InternalSingleValueWindowFunction<>(mock);
+
+               // check setOutputType
+               TypeInformation<String> stringType = 
BasicTypeInfo.STRING_TYPE_INFO;
+               ExecutionConfig execConf = new ExecutionConfig();
+               execConf.setParallelism(42);
+
+               windowFunction.setOutputType(stringType, execConf);
+               verify(mock).setOutputType(stringType, execConf);
+
+               // check open
+               Configuration config = new Configuration();
+
+               windowFunction.open(config);
+               verify(mock).open(config);
+
+               // check setRuntimeContext
+               RuntimeContext rCtx = mock(RuntimeContext.class);
+
+               windowFunction.setRuntimeContext(rCtx);
+               verify(mock).setRuntimeContext(rCtx);
+
+               // check apply
+               TimeWindow w = mock(TimeWindow.class);
+               Collector<String> c = (Collector<String>) mock(Collector.class);
+
+               windowFunction.apply(42L, w, 23L, c);
+               verify(mock).apply(eq(42L), eq(w), 
(Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
+
+               // check close
+               windowFunction.close();
+               verify(mock).close();
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testInternalSingleValueAllWindowFunction() throws Exception 
{
+
+               AllWindowFunctionMock mock = mock(AllWindowFunctionMock.class);
+               InternalSingleValueAllWindowFunction<Long, String, TimeWindow> 
windowFunction =
+                       new InternalSingleValueAllWindowFunction<>(mock);
+
+               // check setOutputType
+               TypeInformation<String> stringType = 
BasicTypeInfo.STRING_TYPE_INFO;
+               ExecutionConfig execConf = new ExecutionConfig();
+               execConf.setParallelism(42);
+
+               windowFunction.setOutputType(stringType, execConf);
+               verify(mock).setOutputType(stringType, execConf);
+
+               // check open
+               Configuration config = new Configuration();
+
+               windowFunction.open(config);
+               verify(mock).open(config);
+
+               // check setRuntimeContext
+               RuntimeContext rCtx = mock(RuntimeContext.class);
+
+               windowFunction.setRuntimeContext(rCtx);
+               verify(mock).setRuntimeContext(rCtx);
+
+               // check apply
+               TimeWindow w = mock(TimeWindow.class);
+               Collector<String> c = (Collector<String>) mock(Collector.class);
+
+               windowFunction.apply(((byte)0), w, 23L, c);
+               verify(mock).apply(eq(w), 
(Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
+
+               // check close
+               windowFunction.close();
+               verify(mock).close();
+       }
+
+       public static class WindowFunctionMock
+               extends RichWindowFunction<Long, String, Long, TimeWindow>
+               implements OutputTypeConfigurable<String> {
+
+               @Override
+               public void setOutputType(TypeInformation<String> outTypeInfo, 
ExecutionConfig executionConfig) { }
+
+               @Override
+               public void apply(Long aLong, TimeWindow window, Iterable<Long> 
input, Collector<String> out) throws Exception { }
+       }
+
+       public static class AllWindowFunctionMock
+               extends RichAllWindowFunction<Long, String, TimeWindow>
+               implements OutputTypeConfigurable<String> {
+
+               @Override
+               public void setOutputType(TypeInformation<String> outTypeInfo, 
ExecutionConfig executionConfig) { }
+
+               @Override
+               public void apply(TimeWindow window, Iterable<Long> values, 
Collector<String> out) throws Exception { }
+       }
+}

Reply via email to