Repository: flink
Updated Branches:
  refs/heads/master fad979532 -> 11643c0cc


[FLINK-2070] [core] Deprecate "print(prefix)" methods and add 
"printOnTaskManager(prefix)" method.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11643c0c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11643c0c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11643c0c

Branch: refs/heads/master
Commit: 11643c0cc79eabe02e952e6fbd56d7a55166b623
Parents: fad9795
Author: Stephan Ewen <[email protected]>
Authored: Wed Jun 3 22:52:00 2015 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Thu Jun 4 00:10:15 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java | 69 ++++++++++++++------
 .../org/apache/flink/api/scala/DataSet.scala    | 26 ++++++++
 2 files changed, 76 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/11643c0c/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 157c666..f1eed3e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -377,18 +377,15 @@ public abstract class DataSet<T> {
        }
 
        /**
-        * Convenience method to get the count (number of elements) of a DataSet
+        * Convenience method to get the count (number of elements) of a 
DataSet.
         *
-        * @return A long integer that represents the number of elements in the 
set
-        *
-        * @see org.apache.flink.api.java.Utils.CountHelper
+        * @return A long integer that represents the number of elements in the 
data set.
         */
        public long count() throws Exception {
-
                final String id = new AbstractID().toString();
 
-               flatMap(new Utils.CountHelper<T>(id)).output(
-                               new DiscardingOutputFormat<Long>());
+               flatMap(new Utils.CountHelper<T>(id)).name("count()")
+                               .output(new 
DiscardingOutputFormat<Long>()).name("count() sink");
 
                JobExecutionResult res = getExecutionEnvironment().execute();
                return res.<Long> getAccumulatorResult(id);
@@ -396,18 +393,17 @@ public abstract class DataSet<T> {
 
 
        /**
-        * Convenience method to get the elements of a DataSet as a List
+        * Convenience method to get the elements of a DataSet as a List.
         * As DataSet can contain a lot of data, this method should be used 
with caution.
         *
         * @return A List containing the elements of the DataSet
-        *
-        * @see org.apache.flink.api.java.Utils.CollectHelper
         */
        public List<T> collect() throws Exception {
                final String id = new AbstractID().toString();
                final TypeSerializer<T> serializer = 
getType().createSerializer(getExecutionEnvironment().getConfig());
                
-               this.flatMap(new Utils.CollectHelper<T>(id, 
serializer)).output(new DiscardingOutputFormat<T>());
+               this.flatMap(new Utils.CollectHelper<T>(id, 
serializer)).name("collect()")
+                               .output(new 
DiscardingOutputFormat<T>()).name("collect() sink");
                JobExecutionResult res = getExecutionEnvironment().execute();
 
                ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
@@ -1341,9 +1337,12 @@ public abstract class DataSet<T> {
         * 
         * <p>This method immediately triggers the program execution, similar 
to the
         * {@link #collect()} and {@link #count()} methods.</p>
+        * 
+        * @see #printToErr()
+        * @see #printOnTaskManager(String)
         */
        public void print() throws Exception {
-               List<T> elements = this.collect();
+               List<T> elements = collect();
                for (T e: elements) {
                        System.out.println(e);
                }
@@ -1358,32 +1357,64 @@ public abstract class DataSet<T> {
         *
         * <p>This method immediately triggers the program execution, similar 
to the
         * {@link #collect()} and {@link #count()} methods.</p>
+        * 
+        * @see #print()
+        * @see #printOnTaskManager(String)
         */
        public void printToErr() throws Exception {
-               List<T> elements = this.collect();
+               List<T> elements = collect();
                for (T e: elements) {
                        System.err.println(e);
                }
        }
+
+       /**
+        * Writes a DataSet to the standard output streams (stdout) of the 
TaskManagers that execute
+        * the program (or more specifically, the data sink operators). On a 
typical cluster setup, the
+        * data will appear in the TaskManagers' <i>.out</i> files.
+        * 
+        * <p>To print the data to the console or stdout stream of the client 
process instead, use the
+        * {@link #print()} method.</p>
+        * 
+        * <p>For each element of the DataSet the result of {@link 
Object#toString()} is written.</p>
+        *
+        * @param prefix The string to prefix each line of the output with. 
This helps identifying outputs
+        *               from different printing sinks.   
+        * @return The DataSink operator that writes the DataSet.
+        *  
+        * @see #print()
+        */
+       public DataSink<T> printOnTaskManager(String prefix) {
+               return output(new PrintingOutputFormat<T>(prefix, false));
+       }
        
        /**
-        * Writes a DataSet to the standard output stream (stdout).<br/>
-        * For each element of the DataSet the result of {@link 
Object#toString()} is written.
+        * Writes a DataSet to the standard output stream (stdout).
+        * 
+        * <p>For each element of the DataSet the result of {@link 
Object#toString()} is written.</p>
         *
-        *  @param sinkIdentifier The string to prefix the output with.
-        *  @return The DataSink that writes the DataSet.
+        * @param sinkIdentifier The string to prefix the output with.
+        * @return The DataSink that writes the DataSet.
+        * 
+        * @deprecated Use {@link #printOnTaskManager(String)} instead.
         */
+       @Deprecated
        public DataSink<T> print(String sinkIdentifier) {
                return output(new PrintingOutputFormat<T>(sinkIdentifier, 
false));
        }
 
        /**
-        * Writes a DataSet to the standard error stream (stderr).<br/>
-        * For each element of the DataSet the result of {@link 
Object#toString()} is written.
+        * Writes a DataSet to the standard error stream (stderr).
+        * 
+        * <p>For each element of the DataSet the result of {@link 
Object#toString()} is written.</p>
         *
         * @param sinkIdentifier The string to prefix the output with.
         * @return The DataSink that writes the DataSet.
+        * 
+        * @deprecated Use {@link #printOnTaskManager(String)} instead, othe 
+        *             {@link PrintingOutputFormat} instead.
         */
+       @Deprecated
        public DataSink<T> printToErr(String sinkIdentifier) {
                return output(new PrintingOutputFormat<T>(sinkIdentifier, 
true));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/11643c0c/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index e283e95..2ade2bc 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1349,13 +1349,35 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def printToErr(): Unit = {
     javaSet.printToErr()
   }
+
+  /**
+   * Writes a DataSet to the standard output streams (stdout) of the 
TaskManagers that execute
+   * the program (or more specifically, the data sink operators). On a typical 
cluster setup, the
+   * data will appear in the TaskManagers' <i>.out</i> files.
+   *
+   * To print the data to the console or stdout stream of the client process 
instead, use the
+   * [[print()]] method.
+   *
+   * For each element of the DataSet the result of [[AnyRef.toString()]] is 
written.
+   *
+   * @param prefix The string to prefix each line of the output with. This 
helps identifying outputs
+   *               from different printing sinks.   
+   * @return The DataSink operator that writes the DataSet.
+   */
+  def printOnTaskManager(prefix: String): DataSink[T] = {
+    javaSet.printOnTaskManager(prefix)
+  }
   
   /**
    * *
    * Writes a DataSet to the standard output stream (stdout) with a sink 
identifier prefixed.
    * This uses [[AnyRef.toString]] on each element.
    * @param sinkIdentifier The string to prefix the output with.
+   * 
+   * @deprecated Use [[printOnTaskManager(String)]] instead.
    */
+  @Deprecated
+  @deprecated
   def print(sinkIdentifier: String): DataSink[T] = {
     output(new PrintingOutputFormat[T](sinkIdentifier, false))
   }
@@ -1364,7 +1386,11 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * Writes a DataSet to the standard error stream (stderr) with a sink 
identifier prefixed.
    * This uses [[AnyRef.toString]] on each element.
    * @param sinkIdentifier The string to prefix the output with.
+   * 
+   * @deprecated Use [[printOnTaskManager(String)]] instead.
    */
+  @Deprecated
+  @deprecated
   def printToErr(sinkIdentifier: String): DataSink[T] = {
       output(new PrintingOutputFormat[T](sinkIdentifier, true))
   }

Reply via email to