[FLINK-3219] [java scala] Implement DataSet.count and DataSet.collect using a 
single operator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9a53587
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9a53587
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9a53587

Branch: refs/heads/master
Commit: e9a535877906bd75df3e633e3c5dad556b9c925d
Parents: 0937be0
Author: Greg Hogan <c...@greghogan.com>
Authored: Mon Jan 11 17:04:31 2016 -0500
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 15 11:44:21 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java |  7 ++---
 .../java/org/apache/flink/api/java/Utils.java   | 33 +++++++++++++-------
 .../org/apache/flink/api/scala/DataSet.scala    |  7 ++---
 .../jsonplan/JsonJobGraphGenerationTest.java    |  2 +-
 4 files changed, 28 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e9a53587/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index c5a636c..be84032 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -46,7 +46,6 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.SelectByMaxFunction;
 import org.apache.flink.api.java.functions.SelectByMinFunction;
 import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.io.PrintingOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter;
@@ -387,8 +386,7 @@ public abstract class DataSet<T> {
        public long count() throws Exception {
                final String id = new AbstractID().toString();
 
-               flatMap(new Utils.CountHelper<T>(id)).name("count()")
-                               .output(new 
DiscardingOutputFormat<Long>()).name("count() sink");
+               output(new Utils.CountHelper<T>(id)).name("count()");
 
                JobExecutionResult res = getExecutionEnvironment().execute();
                return res.<Long> getAccumulatorResult(id);
@@ -405,8 +403,7 @@ public abstract class DataSet<T> {
                final String id = new AbstractID().toString();
                final TypeSerializer<T> serializer = 
getType().createSerializer(getExecutionEnvironment().getConfig());
                
-               this.flatMap(new Utils.CollectHelper<>(id, 
serializer)).name("collect()")
-                               .output(new 
DiscardingOutputFormat<T>()).name("collect() sink");
+               this.output(new Utils.CollectHelper<>(id, 
serializer)).name("collect()");
                JobExecutionResult res = getExecutionEnvironment().execute();
 
                ArrayList<byte[]> accResult = res.getAccumulatorResult(id);

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a53587/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java 
b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index 665f35f..cb10906 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -20,20 +20,19 @@ package org.apache.flink.api.java;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
 import static 
org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAnalysis;
 
 /**
@@ -78,7 +77,7 @@ public final class Utils {
        }
 
        @SkipCodeAnalysis
-       public static class CountHelper<T> extends RichFlatMapFunction<T, Long> 
{
+       public static class CountHelper<T> extends RichOutputFormat<T> {
 
                private static final long serialVersionUID = 1L;
 
@@ -91,18 +90,26 @@ public final class Utils {
                }
 
                @Override
-               public void flatMap(T value, Collector<Long> out) throws 
Exception {
+               public void configure(Configuration parameters) {
+               }
+
+               @Override
+               public void open(int taskNumber, int numTasks) throws 
IOException {
+               }
+
+               @Override
+               public void writeRecord(T record) throws IOException {
                        counter++;
                }
 
                @Override
-               public void close() throws Exception {
+               public void close() throws IOException {
                        getRuntimeContext().getLongCounter(id).add(counter);
                }
        }
 
        @SkipCodeAnalysis
-       public static class CollectHelper<T> extends RichFlatMapFunction<T, T> {
+       public static class CollectHelper<T> extends RichOutputFormat<T> {
 
                private static final long serialVersionUID = 1L;
 
@@ -117,17 +124,21 @@ public final class Utils {
                }
 
                @Override
-               public void open(Configuration parameters) throws Exception {
+               public void configure(Configuration parameters) {
+               }
+
+               @Override
+               public void open(int taskNumber, int numTasks) throws 
IOException {
                        this.accumulator = new SerializedListAccumulator<>();
                }
 
                @Override
-               public void flatMap(T value, Collector<T> out) throws Exception 
{
-                       accumulator.add(value, serializer);
+               public void writeRecord(T record) throws IOException {
+                       accumulator.add(record, serializer);
                }
 
                @Override
-               public void close() throws Exception {
+               public void close() throws IOException {
                        // Important: should only be added in close method to 
minimize traffic of accumulators
                        getRuntimeContext().addAccumulator(id, accumulator);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a53587/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index a3ce53c..396ee90 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.Utils.CountHelper
 import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
-import org.apache.flink.api.java.io.{DiscardingOutputFormat, 
PrintingOutputFormat, TextOutputFormat}
+import org.apache.flink.api.java.io.{PrintingOutputFormat, TextOutputFormat}
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
 import org.apache.flink.api.java.operators.join.JoinType
@@ -521,7 +521,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   @throws(classOf[Exception])
   def count(): Long = {
     val id = new AbstractID().toString
-    javaSet.flatMap(new CountHelper[T](id)).output(new 
DiscardingOutputFormat[java.lang.Long])
+    javaSet.output(new CountHelper[T](id))
     val res = getExecutionEnvironment.execute()
     res.getAccumulatorResult[Long](id)
   }
@@ -539,8 +539,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     val id = new AbstractID().toString
     val serializer = 
getType().createSerializer(getExecutionEnvironment.getConfig)
     
-    javaSet.flatMap(new Utils.CollectHelper[T](id, serializer))
-           .output(new DiscardingOutputFormat[T])
+    javaSet.output(new Utils.CollectHelper[T](id, serializer))
     
     val res = getExecutionEnvironment.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a53587/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
index a862242..a9ade6a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
@@ -183,7 +183,7 @@ public class JsonJobGraphGenerationTest {
                        // without arguments
                        try {
                                final int parallelism = 1; // some ops have DOP 
1 forced
-                               JsonValidator validator = new 
GenericValidator(parallelism, 10);
+                               JsonValidator validator = new 
GenericValidator(parallelism, 9);
                                
TestingExecutionEnvironment.setAsNext(validator, parallelism);
 
                                ConnectedComponents.main();

Reply via email to