This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 09d327d0505c7b03ea757b4fdf2d74f6e91dd8e2 Author: Aljoscha Krettek <[email protected]> AuthorDate: Tue Oct 2 12:01:57 2018 +0200 [FLINK-7811] Disambiguate method calls for Scala 2.12 support These changes are required for 2.12 support and also work on Scala 2.11. --- .../flink/cep/scala/pattern/PatternTest.scala | 5 +- .../impl/acceptPartialFunctions/OnDataSet.scala | 5 +- .../acceptPartialFunctions/OnGroupedDataSet.scala | 5 +- .../org/apache/flink/api/scala/utils/package.scala | 8 +- .../api/scala/AllWindowTranslationTest.scala | 28 +++-- .../api/scala/WindowTranslationTest.scala | 20 +++- .../flink/api/scala/operators/CoGroupITCase.scala | 131 +++++++++++++-------- .../api/scala/operators/GroupCombineITCase.scala | 13 +- .../api/scala/operators/GroupReduceITCase.scala | 84 +++++++------ 9 files changed, 181 insertions(+), 118 deletions(-) diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala index d574513..33d24de 100644 --- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala +++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala @@ -24,6 +24,7 @@ import org.apache.flink.cep.Event import org.apache.flink.cep.SubEvent import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy import org.apache.flink.cep.pattern.conditions._ +import org.apache.flink.cep.scala.conditions.Context class PatternTest { @@ -81,9 +82,9 @@ class PatternTest { def testStrictContiguityWithCondition(): Unit = { val pattern = Pattern.begin[Event]("start") .next("next") - .where((value: Event, _) => value.getName == "foobar") + .where((value: Event, _: Context[Event]) => value.getName == "foobar") .next("end") - .where((value: Event, _) => value.getId == 42) + .where((value: Event, _: Context[Event]) => value.getId == 42) val jPattern = JPattern.begin[Event]("start") .next("next") diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala index b2521b0..9f28c3d 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala @@ -20,6 +20,7 @@ package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.{DataSet, GroupedDataSet} +import org.apache.flink.util.Collector import scala.reflect.ClassTag @@ -53,7 +54,7 @@ class OnDataSet[T](ds: DataSet[T]) { @PublicEvolving def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] = ds.mapPartition { - (it, out) => + (it: Iterator[T], out: Collector[R]) => out.collect(fun(it.toStream)) } @@ -100,7 +101,7 @@ class OnDataSet[T](ds: DataSet[T]) { @PublicEvolving def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] = ds.reduceGroup { - (it, out) => + (it: Iterator[T], out: Collector[R]) => out.collect(fun(it.toStream)) } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala index 07abccb..3636358 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala @@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.operators.Order import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.{DataSet, GroupedDataSet} +import org.apache.flink.util.Collector import scala.reflect.ClassTag @@ -65,7 +66,7 @@ class OnGroupedDataSet[T](ds: GroupedDataSet[T]) { @PublicEvolving def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] = ds.reduceGroup { - (it, out) => + (it: Iterator[T], out: Collector[R]) => out.collect(fun(it.toStream)) } @@ -80,7 +81,7 @@ class OnGroupedDataSet[T](ds: GroupedDataSet[T]) { @PublicEvolving def combineGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] = ds.combineGroup { - (it, out) => + (it: Iterator[T], out: Collector[R]) => out.collect(fun(it.toStream)) } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala index d543998..04bdab6 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala @@ -72,7 +72,9 @@ package object utils { BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], implicitly[TypeInformation[T]] ) - wrap(jutils.zipWithIndex(self.javaSet)).map { t => (t.f0.toLong, t.f1) } + wrap(jutils.zipWithIndex(self.javaSet)).map { + t: org.apache.flink.api.java.tuple.Tuple2[java.lang.Long, T] => (t.f0.toLong, t.f1) + } } /** @@ -85,7 +87,9 @@ package object utils { BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], implicitly[TypeInformation[T]] ) - wrap(jutils.zipWithUniqueId(self.javaSet)).map { t => (t.f0.toLong, t.f1) } + wrap(jutils.zipWithUniqueId(self.javaSet)).map { + t: org.apache.flink.api.java.tuple.Tuple2[java.lang.Long, T]=> (t.f0.toLong, t.f1) + } } // -------------------------------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index e8d5a12..e4a81a7 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -322,9 +322,10 @@ class AllWindowTranslationTest { val window1 = source .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))) .reduce( - new DummyReducer, new AllWindowFunction[(String, Int), (String, Int), TimeWindow] { - override def apply( - window: TimeWindow, + new DummyReducer, + new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] { + override def process( + context: Context, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x)) }) @@ -361,9 +362,9 @@ class AllWindowTranslationTest { val window1 = source .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1))) .reduce( - new DummyReducer, new AllWindowFunction[(String, Int), (String, Int), TimeWindow] { - override def apply( - window: TimeWindow, + new DummyReducer, new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] { + override def process( + context: Context, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x)) }) @@ -519,7 +520,10 @@ class AllWindowTranslationTest { .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))) .reduce( { (x, _) => x }, - { (_, in, out: Collector[(String, Int)]) => in foreach { x => out.collect(x)} }) + { + (_: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => + in foreach { x => out.collect(x)} + }) val transform = window1 .javaStream @@ -756,7 +760,7 @@ class AllWindowTranslationTest { .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))) .aggregate( new DummyAggregator(), - { (_, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => { + { (_: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => { in foreach { x => out.collect(x)} } }) @@ -898,9 +902,9 @@ class AllWindowTranslationTest { .fold( ("", "", 1), new DummyFolder, - new AllWindowFunction[(String, String, Int), (String, Int), TimeWindow] { - override def apply( - window: TimeWindow, + new ProcessAllWindowFunction[(String, String, Int), (String, Int), TimeWindow] { + override def process( + context: Context, input: Iterable[(String, String, Int)], out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))} }) @@ -1104,7 +1108,7 @@ class AllWindowTranslationTest { .fold( ("", "", 1), { (acc: (String, String, Int), _) => acc }, - { (_, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) => + { (_: TimeWindow, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) => in foreach { x => out.collect((x._1, x._3)) } }) diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala index 916884f..b501d9e 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -582,7 +582,10 @@ class WindowTranslationTest { .window(TumblingEventTimeWindows.of(Time.seconds(1))) .reduce( { (x, _) => x }, - { (_, _, in, out: Collector[(String, Int)]) => in foreach { x => out.collect(x)} }) + { + (_: String, _: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => + in foreach { x => out.collect(x)} + }) val transform = window1 .javaStream @@ -824,10 +827,11 @@ class WindowTranslationTest { val window1 = source .keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(1))) - .aggregate(new DummyAggregator(), - { (_, _, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => { + .aggregate( + new DummyAggregator(), + { (_: String, _: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => in foreach { x => out.collect(x)} - } }) + }) val transform = window1 .javaStream @@ -1232,8 +1236,12 @@ class WindowTranslationTest { .fold( ("", "", 1), { (acc: (String, String, Int), _) => acc }, - { (_, _, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) => - in foreach { x => out.collect((x._1, x._3)) } + { ( + _: String, + _: TimeWindow, + in: Iterable[(String, String, Int)], + out: Collector[(String, Int)]) => + in foreach { x => out.collect((x._1, x._3)) } }) val transform = window1 diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala index 448479e..7746cd0 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala @@ -108,12 +108,15 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds = CollectionDataSets.get3TupleDataSet(env) val ds2 = CollectionDataSets.get3TupleDataSet(env) val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) { - (first, second, out: Collector[(Int, Long, String)] ) => - for (t <- first) { - if (t._1 < 6) { - out.collect(t) + ( + first: Iterator[(Int, Long, String)], + second: Iterator[(Int, Long, String)], + out: Collector[(Int, Long, String)] ) => + for (t <- first) { + if (t._1 < 6) { + out.collect(t) + } } - } } coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) env.execute() @@ -127,12 +130,15 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds = CollectionDataSets.get5TupleDataSet(env) val ds2 = CollectionDataSets.get5TupleDataSet(env) val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) { - (first, second, out: Collector[(Int, Long, Int, String, Long)]) => - for (t <- second) { - if (t._1 < 4) { - out.collect(t) + ( + first: Iterator[(Int, Long, Int, String, Long)], + second: Iterator[(Int, Long, Int, String, Long)], + out: Collector[(Int, Long, Int, String, Long)]) => + for (t <- second) { + if (t._1 < 4) { + out.collect(t) + } } - } } coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) env.execute() @@ -247,13 +253,16 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds1 = CollectionDataSets.get5TupleDataSet(env) val ds2 = CollectionDataSets.get3TupleDataSet(env) val coGrouped = ds1.coGroup(ds2).where(0,4).equalTo(0, 1) { - (first, second, out: Collector[(Int, Long, String)]) => - val strs = first map(_._4) - for (t <- second) { - for (s <- strs) { - out.collect((t._1, t._2, s)) + ( + first: Iterator[(Int, Long, Int, String, Long)], + second: Iterator[(Int, Long, String)], + out: Collector[(Int, Long, String)]) => + val strs = first map(_._4) + for (t <- second) { + for (s <- strs) { + out.collect((t._1, t._2, s)) + } } - } } coGrouped.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) @@ -273,13 +282,16 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds2 = CollectionDataSets.get3TupleDataSet(env) val coGrouped = ds1.coGroup(ds2).where(t => (t._1, t._5)).equalTo(t => (t._1, t._2)) .apply { - (first, second, out: Collector[(Int, Long, String)]) => - val strs = first map(_._4) - for (t <- second) { - for (s <- strs) { - out.collect((t._1, t._2, s)) + ( + first: Iterator[(Int, Long, Int, String, Long)], + second: Iterator[(Int, Long, String)], + out: Collector[(Int, Long, String)]) => + val strs = first map(_._4) + for (t <- second) { + for (s <- strs) { + out.collect((t._1, t._2, s)) + } } - } } coGrouped.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) @@ -325,13 +337,16 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds = CollectionDataSets.getSmallPojoDataSet(env) val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env) val coGroupDs = ds.coGroup(ds2).where("nestedPojo.longNumber").equalTo(6) { - (first, second, out: Collector[CustomType]) => - for (p <- first) { - for (t <- second) { - Assert.assertTrue(p.nestedPojo.longNumber == t._7) - out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")) + ( + first: Iterator[CollectionDataSets.POJO], + second: Iterator[(Int, String, Int, Int, Long, String, Long)], + out: Collector[CustomType]) => + for (p <- first) { + for (t <- second) { + Assert.assertTrue(p.nestedPojo.longNumber == t._7) + out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")) + } } - } } coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE) env.execute() @@ -348,13 +363,16 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds = CollectionDataSets.getSmallPojoDataSet(env) val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env) val coGroupDs = ds.coGroup(ds2).where(t => new Tuple1(t.nestedPojo.longNumber)).equalTo(6) { - (first, second, out: Collector[CustomType]) => - for (p <- first) { - for (t <- second) { - Assert.assertTrue(p.nestedPojo.longNumber == t._7) - out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")) + ( + first: Iterator[CollectionDataSets.POJO], + second: Iterator[(Int, String, Int, Int, Long, String, Long)], + out: Collector[CustomType]) => + for (p <- first) { + for (t <- second) { + Assert.assertTrue(p.nestedPojo.longNumber == t._7) + out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")) + } } - } } coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE) env.execute() @@ -371,13 +389,16 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds = CollectionDataSets.getSmallPojoDataSet(env) val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env) val coGroupDs = ds.coGroup(ds2).where(_.nestedPojo.longNumber).equalTo(6) { - (first, second, out: Collector[CustomType]) => - for (p <- first) { - for (t <- second) { - Assert.assertTrue(p.nestedPojo.longNumber == t._7) - out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")) + ( + first: Iterator[CollectionDataSets.POJO], + second: Iterator[(Int, String, Int, Int, Long, String, Long)], + out: Collector[CustomType]) => + for (p <- first) { + for (t <- second) { + Assert.assertTrue(p.nestedPojo.longNumber == t._7) + out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")) + } } - } } coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE) env.execute() @@ -390,14 +411,17 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) val ds2 = env.fromElements(0, 1, 2) val coGroupDs = ds1.coGroup(ds2).where(0).equalTo("*") { - (first, second, out: Collector[(Int, Long, String)]) => - for (p <- first) { - for (t <- second) { - if (p._1 == t) { - out.collect(p) + ( + first: Iterator[(Int, Long, String)], + second: Iterator[Int], + out: Collector[(Int, Long, String)]) => + for (p <- first) { + for (t <- second) { + if (p._1 == t) { + out.collect(p) + } } } - } } coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE) @@ -411,14 +435,17 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds1 = env.fromElements(0, 1, 2) val ds2 = CollectionDataSets.getSmall3TupleDataSet(env) val coGroupDs = ds1.coGroup(ds2).where("*").equalTo(0) { - (first, second, out: Collector[(Int, Long, String)]) => - for (p <- first) { - for (t <- second) { - if (p == t._1) { - out.collect(t) + ( + first: Iterator[Int], + second: Iterator[(Int, Long, String)], + out: Collector[(Int, Long, String)]) => + for (p <- first) { + for (t <- second) { + if (p == t._1) { + out.collect(t) + } } } - } } coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala index 9d286fe..fabefb9 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala @@ -46,7 +46,10 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa ds.combineGroup(new ScalaGroupCombineFunctionExample()) .output(new DiscardingOutputFormat[Tuple1[String]]) - ds.combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect)) + ds + .combineGroup( + (in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) => + in.toSet foreach (out.collect)) .output(new DiscardingOutputFormat[Tuple1[String]]) // all methods on UnsortedGrouping @@ -55,7 +58,9 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa .output(new DiscardingOutputFormat[Tuple1[String]]) ds.groupBy(0) - .combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect)) + .combineGroup( + (in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) => + in.toSet foreach (out.collect)) .output(new DiscardingOutputFormat[Tuple1[String]]) // all methods on SortedGrouping @@ -64,7 +69,9 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa .output(new DiscardingOutputFormat[Tuple1[String]]) ds.groupBy(0).sortGroup(0, Order.ASCENDING) - .combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect)) + .combineGroup( + (in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) => + in.toSet foreach (out.collect)) .output(new DiscardingOutputFormat[Tuple1[String]]) env.execute() diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala index 385691e..9262cb3 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala @@ -269,17 +269,19 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas .map( t => MutableTuple3(t._1, t._2, t._3) ) val reduceDs = ds.groupBy(1).reduceGroup { - (in, out: Collector[MutableTuple3[Int, Long, String]]) => - for (t <- in) { - if (t._1 < 4) { - t._3 = "Hi!" - t._1 += 10 - out.collect(t) - t._1 += 10 - t._3 = "Hi again!" - out.collect(t) + ( + in: Iterator[MutableTuple3[Int, Long, String]], + out: Collector[MutableTuple3[Int, Long, String]]) => + for (t <- in) { + if (t._1 < 4) { + t._3 = "Hi!" + t._1 += 10 + out.collect(t) + t._1 += 10 + t._3 = "Hi again!" + out.collect(t) + } } - } } val result: Seq[String] = reduceDs.collect().map(x => s"${x._1},${x._2},${x._3}").sorted @@ -488,13 +490,15 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas val ds = CollectionDataSets.getPojoContainingTupleAndWritable(env) val reduceDs = ds.groupBy("hadoopFan", "theTuple.*").reduceGroup { - (values, out: Collector[Int]) => { - var c: Int = 0 - for (v <- values) { - c += 1 + ( + values: Iterator[CollectionDataSets.PojoContainingTupleAndWritable], + out: Collector[Int]) => { + var c: Int = 0 + for (v <- values) { + c += 1 + } + out.collect(c) } - out.collect(c) - } } val result: Seq[Int] = reduceDs.collect().sorted @@ -511,9 +515,11 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas val ds = CollectionDataSets.getTupleContainingPojos(env) val reduceDs = ds.groupBy("_1", "_2.*").reduceGroup { - (values, out: Collector[Int]) => { - out.collect(values.size) - } + ( + values: Iterator[(Int, CollectionDataSets.CrazyNested, CollectionDataSets.POJO)], + out: Collector[Int]) => { + out.collect(values.size) + } } val result: Seq[Int] = reduceDs.collect().sorted @@ -643,20 +649,22 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas .sortGroup("theTuple._1", Order.DESCENDING) .sortGroup("theTuple._2", Order.DESCENDING) .reduceGroup { - (values, out: Collector[String]) => { - var once: Boolean = false - val concat: StringBuilder = new StringBuilder - for (value <- values) { - if (!once) { - concat.append(value.hadoopFan.get) - concat.append("---") - once = true + ( + values: Iterator[CollectionDataSets.PojoContainingTupleAndWritable], + out: Collector[String]) => { + var once: Boolean = false + val concat: StringBuilder = new StringBuilder + for (value <- values) { + if (!once) { + concat.append(value.hadoopFan.get) + concat.append("---") + once = true + } + concat.append(value.theTuple) + concat.append("-") } - concat.append(value.theTuple) - concat.append("-") + out.collect(concat.toString()) } - out.collect(concat.toString()) - } } val result: Seq[String] = reduceDs.map(_.toString()).collect().sorted @@ -803,13 +811,15 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas val ds = CollectionDataSets.getPojoWithMultiplePojos(env) val reduceDs = ds.groupBy("p2.a2").reduceGroup { - (values, out: Collector[String]) => { - val concat: StringBuilder = new StringBuilder() - for (value <- values) { - concat.append(value.p2.a2) + ( + values: Iterator[CollectionDataSets.PojoWithMultiplePojos], + out: Collector[String]) => { + val concat: StringBuilder = new StringBuilder() + for (value <- values) { + concat.append(value.p2.a2) + } + out.collect(concat.toString()) } - out.collect(concat.toString()) - } } val result : Seq[String] = reduceDs.map(_.toString()).collect().sorted
