This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 09d327d0505c7b03ea757b4fdf2d74f6e91dd8e2
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Tue Oct 2 12:01:57 2018 +0200

    [FLINK-7811] Disambiguate method calls for Scala 2.12 support
    
    These changes are required for 2.12 support and also work on Scala 2.11.
---
 .../flink/cep/scala/pattern/PatternTest.scala      |   5 +-
 .../impl/acceptPartialFunctions/OnDataSet.scala    |   5 +-
 .../acceptPartialFunctions/OnGroupedDataSet.scala  |   5 +-
 .../org/apache/flink/api/scala/utils/package.scala |   8 +-
 .../api/scala/AllWindowTranslationTest.scala       |  28 +++--
 .../api/scala/WindowTranslationTest.scala          |  20 +++-
 .../flink/api/scala/operators/CoGroupITCase.scala  | 131 +++++++++++++--------
 .../api/scala/operators/GroupCombineITCase.scala   |  13 +-
 .../api/scala/operators/GroupReduceITCase.scala    |  84 +++++++------
 9 files changed, 181 insertions(+), 118 deletions(-)

diff --git 
a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
 
b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
index d574513..33d24de 100644
--- 
a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
+++ 
b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
@@ -24,6 +24,7 @@ import org.apache.flink.cep.Event
 import org.apache.flink.cep.SubEvent
 import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy
 import org.apache.flink.cep.pattern.conditions._
+import org.apache.flink.cep.scala.conditions.Context
 
 class PatternTest {
 
@@ -81,9 +82,9 @@ class PatternTest {
   def testStrictContiguityWithCondition(): Unit = {
     val pattern = Pattern.begin[Event]("start")
       .next("next")
-      .where((value: Event, _) => value.getName == "foobar")
+      .where((value: Event, _: Context[Event]) => value.getName == "foobar")
       .next("end")
-      .where((value: Event, _) => value.getId == 42)
+      .where((value: Event, _: Context[Event]) => value.getId == 42)
 
     val jPattern = JPattern.begin[Event]("start")
       .next("next")
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
index b2521b0..9f28c3d 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
@@ -20,6 +20,7 @@ package 
org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
+import org.apache.flink.util.Collector
 
 import scala.reflect.ClassTag
 
@@ -53,7 +54,7 @@ class OnDataSet[T](ds: DataSet[T]) {
   @PublicEvolving
   def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): 
DataSet[R] =
     ds.mapPartition {
-      (it, out) =>
+      (it: Iterator[T], out: Collector[R]) =>
         out.collect(fun(it.toStream))
     }
 
@@ -100,7 +101,7 @@ class OnDataSet[T](ds: DataSet[T]) {
   @PublicEvolving
   def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): 
DataSet[R] =
     ds.reduceGroup {
-      (it, out) =>
+      (it: Iterator[T], out: Collector[R]) =>
         out.collect(fun(it.toStream))
     }
 
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
index 07abccb..3636358 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
+import org.apache.flink.util.Collector
 
 import scala.reflect.ClassTag
 
@@ -65,7 +66,7 @@ class OnGroupedDataSet[T](ds: GroupedDataSet[T]) {
   @PublicEvolving
   def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): 
DataSet[R] =
     ds.reduceGroup {
-      (it, out) =>
+      (it: Iterator[T], out: Collector[R]) =>
         out.collect(fun(it.toStream))
     }
 
@@ -80,7 +81,7 @@ class OnGroupedDataSet[T](ds: GroupedDataSet[T]) {
   @PublicEvolving
   def combineGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): 
DataSet[R] =
     ds.combineGroup {
-      (it, out) =>
+      (it: Iterator[T], out: Collector[R]) =>
         out.collect(fun(it.toStream))
     }
 
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
index d543998..04bdab6 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
@@ -72,7 +72,9 @@ package object utils {
         BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
         implicitly[TypeInformation[T]]
       )
-      wrap(jutils.zipWithIndex(self.javaSet)).map { t => (t.f0.toLong, t.f1) }
+      wrap(jutils.zipWithIndex(self.javaSet)).map {
+        t: org.apache.flink.api.java.tuple.Tuple2[java.lang.Long, T] => 
(t.f0.toLong, t.f1)
+      }
     }
 
     /**
@@ -85,7 +87,9 @@ package object utils {
         BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
         implicitly[TypeInformation[T]]
       )
-      wrap(jutils.zipWithUniqueId(self.javaSet)).map { t => (t.f0.toLong, 
t.f1) }
+      wrap(jutils.zipWithUniqueId(self.javaSet)).map {
+        t: org.apache.flink.api.java.tuple.Tuple2[java.lang.Long, T]=> 
(t.f0.toLong, t.f1)
+      }
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index e8d5a12..e4a81a7 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -322,9 +322,10 @@ class AllWindowTranslationTest {
     val window1 = source
       .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
       .reduce(
-        new DummyReducer, new AllWindowFunction[(String, Int), (String, Int), 
TimeWindow] {
-          override def apply(
-              window: TimeWindow,
+        new DummyReducer,
+        new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] 
{
+          override def process(
+              context: Context,
               input: Iterable[(String, Int)],
               out: Collector[(String, Int)]): Unit = input foreach ( x => 
out.collect(x))
         })
@@ -361,9 +362,9 @@ class AllWindowTranslationTest {
     val window1 = source
       .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
       .reduce(
-        new DummyReducer, new AllWindowFunction[(String, Int), (String, Int), 
TimeWindow] {
-          override def apply(
-              window: TimeWindow,
+        new DummyReducer, new ProcessAllWindowFunction[(String, Int), (String, 
Int), TimeWindow] {
+          override def process(
+              context: Context,
               input: Iterable[(String, Int)],
               out: Collector[(String, Int)]): Unit = input foreach ( x => 
out.collect(x))
         })
@@ -519,7 +520,10 @@ class AllWindowTranslationTest {
       .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
       .reduce(
         { (x, _) => x },
-        { (_, in, out: Collector[(String, Int)]) => in foreach { x => 
out.collect(x)} })
+        {
+          (_: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, 
Int)]) =>
+            in foreach { x => out.collect(x)}
+        })
 
     val transform = window1
       .javaStream
@@ -756,7 +760,7 @@ class AllWindowTranslationTest {
       .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
       .aggregate(
         new DummyAggregator(),
-        { (_, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => {
+        { (_: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, 
Int)]) => {
           in foreach { x => out.collect(x)}
         } })
 
@@ -898,9 +902,9 @@ class AllWindowTranslationTest {
       .fold(
         ("", "", 1),
         new DummyFolder,
-        new AllWindowFunction[(String, String, Int), (String, Int), 
TimeWindow] {
-          override def apply(
-              window: TimeWindow,
+        new ProcessAllWindowFunction[(String, String, Int), (String, Int), 
TimeWindow] {
+          override def process(
+              context: Context,
               input: Iterable[(String, String, Int)],
               out: Collector[(String, Int)]): Unit = input foreach {x => 
out.collect((x._1, x._3))}
         })
@@ -1104,7 +1108,7 @@ class AllWindowTranslationTest {
       .fold(
         ("", "", 1),
         { (acc: (String, String, Int), _) => acc },
-        { (_, in: Iterable[(String, String, Int)], out: Collector[(String, 
Int)]) =>
+        { (_: TimeWindow, in: Iterable[(String, String, Int)], out: 
Collector[(String, Int)]) =>
           in foreach { x => out.collect((x._1, x._3)) }
         })
 
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 916884f..b501d9e 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -582,7 +582,10 @@ class WindowTranslationTest {
       .window(TumblingEventTimeWindows.of(Time.seconds(1)))
       .reduce(
         { (x, _) => x },
-        { (_, _, in, out: Collector[(String, Int)]) => in foreach { x => 
out.collect(x)} })
+        {
+          (_: String, _: TimeWindow, in: Iterable[(String, Int)], out: 
Collector[(String, Int)]) =>
+            in foreach { x => out.collect(x)}
+        })
 
     val transform = window1
       .javaStream
@@ -824,10 +827,11 @@ class WindowTranslationTest {
     val window1 = source
       .keyBy(_._1)
       .window(TumblingEventTimeWindows.of(Time.seconds(1)))
-      .aggregate(new DummyAggregator(),
-        { (_, _, in: Iterable[(String, Int)], out: Collector[(String, Int)]) 
=> {
+      .aggregate(
+        new DummyAggregator(),
+        { (_: String, _: TimeWindow, in: Iterable[(String, Int)], out: 
Collector[(String, Int)]) =>
           in foreach { x => out.collect(x)}
-        } })
+        })
 
     val transform = window1
       .javaStream
@@ -1232,8 +1236,12 @@ class WindowTranslationTest {
       .fold(
         ("", "", 1),
         { (acc: (String, String, Int), _) => acc },
-        { (_, _, in: Iterable[(String, String, Int)], out: Collector[(String, 
Int)]) =>
-          in foreach { x => out.collect((x._1, x._3)) }
+        { (
+            _: String,
+            _: TimeWindow,
+            in: Iterable[(String, String, Int)],
+            out: Collector[(String, Int)]) =>
+              in foreach { x => out.collect((x._1, x._3)) }
         })
 
     val transform = window1
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
index 448479e..7746cd0 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
@@ -108,12 +108,15 @@ class CoGroupITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     val ds = CollectionDataSets.get3TupleDataSet(env)
     val ds2 = CollectionDataSets.get3TupleDataSet(env)
     val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
-      (first, second, out: Collector[(Int, Long, String)] ) =>
-        for (t <- first) {
-          if (t._1 < 6) {
-            out.collect(t)
+      (
+        first: Iterator[(Int, Long, String)],
+        second: Iterator[(Int, Long, String)],
+        out: Collector[(Int, Long, String)] ) =>
+          for (t <- first) {
+            if (t._1 < 6) {
+              out.collect(t)
+            }
           }
-        }
     }
     coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
@@ -127,12 +130,15 @@ class CoGroupITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     val ds = CollectionDataSets.get5TupleDataSet(env)
     val ds2 = CollectionDataSets.get5TupleDataSet(env)
     val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
-      (first, second, out: Collector[(Int, Long, Int, String, Long)]) =>
-        for (t <- second) {
-          if (t._1 < 4) {
-            out.collect(t)
+      (
+        first: Iterator[(Int, Long, Int, String, Long)],
+        second: Iterator[(Int, Long, Int, String, Long)],
+        out: Collector[(Int, Long, Int, String, Long)]) =>
+          for (t <- second) {
+            if (t._1 < 4) {
+              out.collect(t)
+            }
           }
-        }
     }
     coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
@@ -247,13 +253,16 @@ class CoGroupITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     val ds1 = CollectionDataSets.get5TupleDataSet(env)
     val ds2 = CollectionDataSets.get3TupleDataSet(env)
     val coGrouped = ds1.coGroup(ds2).where(0,4).equalTo(0, 1) {
-      (first, second, out: Collector[(Int, Long, String)]) =>
-        val strs = first map(_._4)
-        for (t <- second) {
-          for (s <- strs) {
-            out.collect((t._1, t._2, s))
+      (
+        first: Iterator[(Int, Long, Int, String, Long)],
+        second: Iterator[(Int, Long, String)],
+        out: Collector[(Int, Long, String)]) =>
+          val strs = first map(_._4)
+          for (t <- second) {
+            for (s <- strs) {
+              out.collect((t._1, t._2, s))
+            }
           }
-        }
     }
 
     coGrouped.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
@@ -273,13 +282,16 @@ class CoGroupITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     val ds2 = CollectionDataSets.get3TupleDataSet(env)
     val coGrouped = ds1.coGroup(ds2).where(t => (t._1, t._5)).equalTo(t => 
(t._1, t._2))
       .apply {
-      (first, second, out: Collector[(Int, Long, String)]) =>
-        val strs = first map(_._4)
-        for (t <- second) {
-          for (s <- strs) {
-            out.collect((t._1, t._2, s))
+      (
+        first: Iterator[(Int, Long, Int, String, Long)],
+        second: Iterator[(Int, Long, String)],
+        out: Collector[(Int, Long, String)]) =>
+          val strs = first map(_._4)
+          for (t <- second) {
+            for (s <- strs) {
+              out.collect((t._1, t._2, s))
+            }
           }
-        }
     }
 
     coGrouped.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
@@ -325,13 +337,16 @@ class CoGroupITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     val ds = CollectionDataSets.getSmallPojoDataSet(env)
     val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
     val coGroupDs = ds.coGroup(ds2).where("nestedPojo.longNumber").equalTo(6) {
-      (first, second, out: Collector[CustomType]) =>
-        for (p <- first) {
-          for (t <- second) {
-            Assert.assertTrue(p.nestedPojo.longNumber == t._7)
-            out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+      (
+        first: Iterator[CollectionDataSets.POJO],
+        second: Iterator[(Int, String, Int, Int, Long, String, Long)],
+        out: Collector[CustomType]) =>
+          for (p <- first) {
+            for (t <- second) {
+              Assert.assertTrue(p.nestedPojo.longNumber == t._7)
+              out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+            }
           }
-        }
     }
     coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
@@ -348,13 +363,16 @@ class CoGroupITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     val ds = CollectionDataSets.getSmallPojoDataSet(env)
     val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
     val coGroupDs = ds.coGroup(ds2).where(t => new 
Tuple1(t.nestedPojo.longNumber)).equalTo(6) {
-      (first, second, out: Collector[CustomType]) =>
-        for (p <- first) {
-          for (t <- second) {
-            Assert.assertTrue(p.nestedPojo.longNumber == t._7)
-            out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+      (
+        first: Iterator[CollectionDataSets.POJO],
+        second: Iterator[(Int, String, Int, Int, Long, String, Long)],
+        out: Collector[CustomType]) =>
+          for (p <- first) {
+            for (t <- second) {
+              Assert.assertTrue(p.nestedPojo.longNumber == t._7)
+              out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+            }
           }
-        }
     }
     coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
@@ -371,13 +389,16 @@ class CoGroupITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     val ds = CollectionDataSets.getSmallPojoDataSet(env)
     val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
     val coGroupDs = ds.coGroup(ds2).where(_.nestedPojo.longNumber).equalTo(6) {
-      (first, second, out: Collector[CustomType]) =>
-        for (p <- first) {
-          for (t <- second) {
-            Assert.assertTrue(p.nestedPojo.longNumber == t._7)
-            out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+      (
+        first: Iterator[CollectionDataSets.POJO],
+        second: Iterator[(Int, String, Int, Int, Long, String, Long)],
+        out: Collector[CustomType]) =>
+          for (p <- first) {
+            for (t <- second) {
+              Assert.assertTrue(p.nestedPojo.longNumber == t._7)
+              out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+            }
           }
-        }
     }
     coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
@@ -390,14 +411,17 @@ class CoGroupITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
     val ds2 = env.fromElements(0, 1, 2)
     val coGroupDs = ds1.coGroup(ds2).where(0).equalTo("*") {
-      (first, second, out: Collector[(Int, Long, String)]) =>
-        for (p <- first) {
-          for (t <- second) {
-            if (p._1 == t) {
-              out.collect(p)
+      (
+        first: Iterator[(Int, Long, String)],
+        second: Iterator[Int],
+        out: Collector[(Int, Long, String)]) =>
+          for (p <- first) {
+            for (t <- second) {
+              if (p._1 == t) {
+                out.collect(p)
+              }
             }
           }
-        }
     }
 
     coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
@@ -411,14 +435,17 @@ class CoGroupITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
     val ds1 = env.fromElements(0, 1, 2)
     val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
     val coGroupDs = ds1.coGroup(ds2).where("*").equalTo(0) {
-      (first, second, out: Collector[(Int, Long, String)]) =>
-        for (p <- first) {
-          for (t <- second) {
-            if (p == t._1) {
-              out.collect(t)
+      (
+        first: Iterator[Int],
+        second: Iterator[(Int, Long, String)],
+        out: Collector[(Int, Long, String)]) =>
+          for (p <- first) {
+            for (t <- second) {
+              if (p == t._1) {
+                out.collect(t)
+              }
             }
           }
-        }
     }
 
     coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
index 9d286fe..fabefb9 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
@@ -46,7 +46,10 @@ class GroupCombineITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBa
     ds.combineGroup(new ScalaGroupCombineFunctionExample())
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
-    ds.combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach 
(out.collect))
+    ds
+      .combineGroup(
+        (in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) =>
+          in.toSet foreach (out.collect))
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
     // all methods on UnsortedGrouping
@@ -55,7 +58,9 @@ class GroupCombineITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBa
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
     ds.groupBy(0)
-      .combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach 
(out.collect))
+      .combineGroup(
+        (in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) =>
+          in.toSet foreach (out.collect))
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
     // all methods on SortedGrouping
@@ -64,7 +69,9 @@ class GroupCombineITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBa
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
     ds.groupBy(0).sortGroup(0, Order.ASCENDING)
-      .combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach 
(out.collect))
+      .combineGroup(
+        (in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) =>
+          in.toSet foreach (out.collect))
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
     env.execute()
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
index 385691e..9262cb3 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
@@ -269,17 +269,19 @@ class GroupReduceITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBas
       .map( t => MutableTuple3(t._1, t._2, t._3) )
     
     val reduceDs =  ds.groupBy(1).reduceGroup {
-      (in, out: Collector[MutableTuple3[Int, Long, String]]) =>
-        for (t <- in) {
-          if (t._1 < 4) {
-            t._3 = "Hi!"
-            t._1 += 10
-            out.collect(t)
-            t._1 += 10
-            t._3 = "Hi again!"
-            out.collect(t)
+      (
+        in: Iterator[MutableTuple3[Int, Long, String]],
+        out: Collector[MutableTuple3[Int, Long, String]]) =>
+          for (t <- in) {
+            if (t._1 < 4) {
+              t._3 = "Hi!"
+              t._1 += 10
+              out.collect(t)
+              t._1 += 10
+              t._3 = "Hi again!"
+              out.collect(t)
+            }
           }
-        }
     }
     val result: Seq[String] = reduceDs.collect().map(x => 
s"${x._1},${x._2},${x._3}").sorted
     
@@ -488,13 +490,15 @@ class GroupReduceITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBas
     val ds =  CollectionDataSets.getPojoContainingTupleAndWritable(env)
     
     val reduceDs = ds.groupBy("hadoopFan", "theTuple.*").reduceGroup {
-      (values, out: Collector[Int]) => {
-        var c: Int = 0
-        for (v <- values) {
-          c += 1
+      (
+        values: Iterator[CollectionDataSets.PojoContainingTupleAndWritable],
+        out: Collector[Int]) => {
+          var c: Int = 0
+          for (v <- values) {
+            c += 1
+          }
+          out.collect(c)
         }
-        out.collect(c)
-      }
     }
     
     val result: Seq[Int] = reduceDs.collect().sorted
@@ -511,9 +515,11 @@ class GroupReduceITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBas
     val ds = CollectionDataSets.getTupleContainingPojos(env)
     
     val reduceDs =  ds.groupBy("_1", "_2.*").reduceGroup {
-      (values, out: Collector[Int]) => {
-        out.collect(values.size)
-      }
+      (
+        values: Iterator[(Int, CollectionDataSets.CrazyNested, 
CollectionDataSets.POJO)],
+        out: Collector[Int]) => {
+          out.collect(values.size)
+        }
     }
     
     val result: Seq[Int] = reduceDs.collect().sorted
@@ -643,20 +649,22 @@ class GroupReduceITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBas
       .sortGroup("theTuple._1", Order.DESCENDING)
       .sortGroup("theTuple._2", Order.DESCENDING)
       .reduceGroup {
-        (values, out: Collector[String]) => {
-          var once: Boolean = false
-          val concat: StringBuilder = new StringBuilder
-          for (value <- values) {
-            if (!once) {
-              concat.append(value.hadoopFan.get)
-              concat.append("---")
-              once = true
+        (
+          values: Iterator[CollectionDataSets.PojoContainingTupleAndWritable],
+          out: Collector[String]) => {
+            var once: Boolean = false
+            val concat: StringBuilder = new StringBuilder
+            for (value <- values) {
+              if (!once) {
+                concat.append(value.hadoopFan.get)
+                concat.append("---")
+                once = true
+              }
+              concat.append(value.theTuple)
+              concat.append("-")
             }
-            concat.append(value.theTuple)
-            concat.append("-")
+            out.collect(concat.toString())
           }
-          out.collect(concat.toString())
-        }
       }
     
     val result: Seq[String] = reduceDs.map(_.toString()).collect().sorted
@@ -803,13 +811,15 @@ class GroupReduceITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBas
     val ds =  CollectionDataSets.getPojoWithMultiplePojos(env)
     
     val reduceDs =  ds.groupBy("p2.a2").reduceGroup {
-      (values, out: Collector[String]) => {
-        val concat: StringBuilder = new StringBuilder()
-        for (value <- values) {
-          concat.append(value.p2.a2)
+      (
+        values: Iterator[CollectionDataSets.PojoWithMultiplePojos],
+        out: Collector[String]) => {
+          val concat: StringBuilder = new StringBuilder()
+          for (value <- values) {
+            concat.append(value.p2.a2)
+          }
+          out.collect(concat.toString())
         }
-        out.collect(concat.toString())
-      }
     }
     
     val result : Seq[String] = reduceDs.map(_.toString()).collect().sorted

Reply via email to