Repository: flink
Updated Branches:
  refs/heads/release-1.0 603f351e2 -> ba069f35b


http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala
new file mode 100644
index 0000000..4e77d83
--- /dev/null
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.function
+
+import java.io.Serializable
+
+import org.apache.flink.annotation.Public
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+/**
+  * Base interface for functions that are evaluated over keyed (grouped) 
windows.
+  *
+  * @tparam IN The type of the input value.
+  * @tparam OUT The type of the output value.
+  */
+@Public
+trait AllWindowFunction[IN, OUT, W <: Window] extends Function with 
Serializable {
+
+  /**
+    * Evaluates the window and outputs none or several elements.
+    *
+    * @param window The window that is being evaluated.
+    * @param input  The elements in the window being evaluated.
+    * @param out    A collector for emitting elements.
+    * @throws Exception The function may throw exceptions to fail the program 
and trigger recovery.
+    */
+  def apply(window: W, input: Iterable[IN], out: Collector[OUT])
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala
new file mode 100644
index 0000000..67236b7
--- /dev/null
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.function
+
+import java.io.Serializable
+
+import org.apache.flink.annotation.Public
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+/**
+  * Base interface for functions that are evaluated over keyed (grouped) 
windows.
+  *
+  * @tparam IN The type of the input value.
+  * @tparam OUT The type of the output value.
+  * @tparam KEY The type of the key.
+  */
+@Public
+trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with 
Serializable {
+
+  /**
+    * Evaluates the window and outputs none or several elements.
+    *
+    * @param key    The key for which this window is evaluated.
+    * @param window The window that is being evaluated.
+    * @param input  The elements in the window being evaluated.
+    * @param out    A collector for emitting elements.
+    * @throws Exception The function may throw exceptions to fail the program 
and trigger recovery.
+    */
+  def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index dcdfa91..f73307c 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.common.state.ReducingStateDescriptor
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, 
AllWindowFunction}
+import org.apache.flink.streaming.api.scala.function.{WindowFunction, 
AllWindowFunction}
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import 
org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows,
 TumblingTimeWindows, SlidingTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, 
TimeEvictor}
@@ -76,7 +76,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .windowAll(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
-      .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), 
TimeWindow]() {
+      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() 
{
         def apply(
             window: TimeWindow,
             values: Iterable[(String, Int)],
@@ -122,7 +122,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
     val window2 = source
       .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
-      .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), 
TimeWindow]() {
+      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() 
{
       def apply(
                     window: TimeWindow,
                     values: Iterable[(String, Int)],
@@ -173,7 +173,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .evictor(CountEvictor.of(1000))
-      .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), 
TimeWindow]() {
+      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() 
{
       def apply(
                     window: TimeWindow,
                     values: Iterable[(String, Int)],
@@ -211,7 +211,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,
-                   values: (String, Int),
+                   values: Iterable[(String, Int)],
                    out: Collector[(String, Int)]) { }
       })
 
@@ -236,7 +236,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,
-                   values: (String, Int),
+                   values: Iterable[(String, Int)],
                    out: Collector[(String, Int)]) { }
       })
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 673d7b3..48ff640 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
 
 import org.apache.flink.api.common.state.{ListStateDescriptor, 
ReducingStateDescriptor}
 import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction
+import org.apache.flink.streaming.api.scala.function.WindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import 
org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows,
 TumblingTimeWindows, SlidingTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, 
TimeEvictor}
@@ -65,7 +65,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
     val window2 = source
       .keyBy(0)
       .timeWindow(Time.minutes(1))
-      .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, 
TimeWindow]() {
+      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
         def apply(
             key: Tuple,
             window: TimeWindow,
@@ -114,7 +114,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .keyBy(0)
       .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
-      .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, 
TimeWindow]() {
+      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
       def apply(
                     tuple: Tuple,
                     window: TimeWindow,
@@ -168,7 +168,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .evictor(CountEvictor.of(1000))
-      .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, 
TimeWindow]() {
+      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
       def apply(
                     tuple: Tuple,
                     window: TimeWindow,
@@ -207,7 +207,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,
-                   values: (String, Int),
+                   values: Iterable[(String, Int)],
                    out: Collector[(String, Int)]) { }
       })
 
@@ -232,7 +232,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,
-                   values: (String, Int),
+                   values: Iterable[(String, Int)],
                    out: Collector[(String, Int)]) { }
       })
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 436dd0d..d18a45e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -105,7 +105,7 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                                                        NUM_ELEMENTS_PER_KEY / 
3))
                                        .rebalance()
                                        .timeWindowAll(Time.of(WINDOW_SIZE, 
MILLISECONDS))
-                                       .apply(new 
RichAllWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, 
IntType>, TimeWindow>() {
+                                       .apply(new 
RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
TimeWindow>() {
 
                                                private boolean open = false;
 
@@ -167,7 +167,7 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                                        .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
                                        .rebalance()
                                        .timeWindowAll(Time.of(WINDOW_SIZE, 
MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
-                                       .apply(new 
RichAllWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, 
IntType>, TimeWindow>() {
+                                       .apply(new 
RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
TimeWindow>() {
 
                                                private boolean open = false;
 
@@ -254,13 +254,18 @@ public class EventTimeAllWindowCheckpointingITCase 
extends TestLogger {
                                                @Override
                                                public void apply(
                                                                TimeWindow 
window,
-                                                               Tuple2<Long, 
IntType> input,
+                                                               
Iterable<Tuple2<Long, IntType>> input,
                                                                
Collector<Tuple4<Long, Long, Long, IntType>> out) {
 
                                                        // validate that the 
function has been opened properly
                                                        assertTrue(open);
 
-                                                       out.collect(new 
Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1));
+                                                       for (Tuple2<Long, 
IntType> in: input) {
+                                                               out.collect(new 
Tuple4<>(in.f0,
+                                                                               
window.getStart(),
+                                                                               
window.getEnd(),
+                                                                               
in.f1));
+                                                       }
                                                }
                                        })
                                        .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
@@ -323,13 +328,18 @@ public class EventTimeAllWindowCheckpointingITCase 
extends TestLogger {
                                                @Override
                                                public void apply(
                                                                TimeWindow 
window,
-                                                               Tuple2<Long, 
IntType> input,
+                                                               
Iterable<Tuple2<Long, IntType>> input,
                                                                
Collector<Tuple4<Long, Long, Long, IntType>> out) {
 
                                                        // validate that the 
function has been opened properly
                                                        assertTrue(open);
 
-                                                       out.collect(new 
Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1));
+                                                       for (Tuple2<Long, 
IntType> in: input) {
+                                                               out.collect(new 
Tuple4<>(in.f0,
+                                                                               
window.getStart(),
+                                                                               
window.getEnd(),
+                                                                               
in.f1));
+                                                       }
                                                }
                                        })
                                        .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 6f178d8..ce705e1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -152,7 +152,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS))
-                                       .apply(new 
RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, 
IntType>, Tuple, TimeWindow>() {
+                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
 
                                                private boolean open = false;
 
@@ -216,7 +216,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS))
-                                       .apply(new 
RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, 
IntType>, Tuple, TimeWindow>() {
+                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
 
                                                private boolean open = false;
 
@@ -285,7 +285,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
-                                       .apply(new 
RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, 
IntType>, Tuple, TimeWindow>() {
+                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
 
                                                private boolean open = false;
 
@@ -373,13 +373,18 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                                public void apply(
                                                                Tuple tuple,
                                                                TimeWindow 
window,
-                                                               Tuple2<Long, 
IntType> input,
+                                                               
Iterable<Tuple2<Long, IntType>> input,
                                                                
Collector<Tuple4<Long, Long, Long, IntType>> out) {
 
                                                        // validate that the 
function has been opened properly
                                                        assertTrue(open);
 
-                                                       out.collect(new 
Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1));
+                                                       for (Tuple2<Long, 
IntType> in: input) {
+                                                               out.collect(new 
Tuple4<>(in.f0,
+                                                                               
window.getStart(),
+                                                                               
window.getEnd(),
+                                                                               
in.f1));
+                                                       }
                                                }
                                        })
                                        .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
@@ -443,13 +448,18 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                                public void apply(
                                                                Tuple tuple,
                                                                TimeWindow 
window,
-                                                               Tuple2<Long, 
IntType> input,
+                                                               
Iterable<Tuple2<Long, IntType>> input,
                                                                
Collector<Tuple4<Long, Long, Long, IntType>> out) {
 
                                                        // validate that the 
function has been opened properly
                                                        assertTrue(open);
 
-                                                       out.collect(new 
Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1));
+                                                       for (Tuple2<Long, 
IntType> in: input) {
+                                                               out.collect(new 
Tuple4<>(in.f0,
+                                                                               
window.getStart(),
+                                                                               
window.getEnd(),
+                                                                               
in.f1));
+                                                       }
                                                }
                                        })
                                        .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index eb5ef5a..aa5ff3b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -117,7 +117,7 @@ public class WindowCheckpointingITCase extends TestLogger {
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(100, MILLISECONDS))
-                                       .apply(new 
RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple2<Long, IntType>, 
Tuple, TimeWindow>() {
+                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, 
TimeWindow>() {
 
                                                private boolean open = false;
 
@@ -175,7 +175,7 @@ public class WindowCheckpointingITCase extends TestLogger {
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(150, MILLISECONDS), 
Time.of(50, MILLISECONDS))
-                                       .apply(new 
RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple2<Long, IntType>, 
Tuple, TimeWindow>() {
+                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, 
TimeWindow>() {
 
                                                private boolean open = false;
 

Reply via email to