Repository: flink Updated Branches: refs/heads/master fad979532 -> 11643c0cc
[FLINK-2070] [core] Deprecate "print(prefix)" methods and add "printOnTaskManager(prefix)" method. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11643c0c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11643c0c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11643c0c Branch: refs/heads/master Commit: 11643c0cc79eabe02e952e6fbd56d7a55166b623 Parents: fad9795 Author: Stephan Ewen <[email protected]> Authored: Wed Jun 3 22:52:00 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Thu Jun 4 00:10:15 2015 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/api/java/DataSet.java | 69 ++++++++++++++------ .../org/apache/flink/api/scala/DataSet.scala | 26 ++++++++ 2 files changed, 76 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/11643c0c/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 157c666..f1eed3e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -377,18 +377,15 @@ public abstract class DataSet<T> { } /** - * Convenience method to get the count (number of elements) of a DataSet + * Convenience method to get the count (number of elements) of a DataSet. * - * @return A long integer that represents the number of elements in the set - * - * @see org.apache.flink.api.java.Utils.CountHelper + * @return A long integer that represents the number of elements in the data set. */ public long count() throws Exception { - final String id = new AbstractID().toString(); - flatMap(new Utils.CountHelper<T>(id)).output( - new DiscardingOutputFormat<Long>()); + flatMap(new Utils.CountHelper<T>(id)).name("count()") + .output(new DiscardingOutputFormat<Long>()).name("count() sink"); JobExecutionResult res = getExecutionEnvironment().execute(); return res.<Long> getAccumulatorResult(id); @@ -396,18 +393,17 @@ public abstract class DataSet<T> { /** - * Convenience method to get the elements of a DataSet as a List + * Convenience method to get the elements of a DataSet as a List. * As DataSet can contain a lot of data, this method should be used with caution. * * @return A List containing the elements of the DataSet - * - * @see org.apache.flink.api.java.Utils.CollectHelper */ public List<T> collect() throws Exception { final String id = new AbstractID().toString(); final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig()); - this.flatMap(new Utils.CollectHelper<T>(id, serializer)).output(new DiscardingOutputFormat<T>()); + this.flatMap(new Utils.CollectHelper<T>(id, serializer)).name("collect()") + .output(new DiscardingOutputFormat<T>()).name("collect() sink"); JobExecutionResult res = getExecutionEnvironment().execute(); ArrayList<byte[]> accResult = res.getAccumulatorResult(id); @@ -1341,9 +1337,12 @@ public abstract class DataSet<T> { * * <p>This method immediately triggers the program execution, similar to the * {@link #collect()} and {@link #count()} methods.</p> + * + * @see #printToErr() + * @see #printOnTaskManager(String) */ public void print() throws Exception { - List<T> elements = this.collect(); + List<T> elements = collect(); for (T e: elements) { System.out.println(e); } @@ -1358,32 +1357,64 @@ public abstract class DataSet<T> { * * <p>This method immediately triggers the program execution, similar to the * {@link #collect()} and {@link #count()} methods.</p> + * + * @see #print() + * @see #printOnTaskManager(String) */ public void printToErr() throws Exception { - List<T> elements = this.collect(); + List<T> elements = collect(); for (T e: elements) { System.err.println(e); } } + + /** + * Writes a DataSet to the standard output streams (stdout) of the TaskManagers that execute + * the program (or more specifically, the data sink operators). On a typical cluster setup, the + * data will appear in the TaskManagers' <i>.out</i> files. + * + * <p>To print the data to the console or stdout stream of the client process instead, use the + * {@link #print()} method.</p> + * + * <p>For each element of the DataSet the result of {@link Object#toString()} is written.</p> + * + * @param prefix The string to prefix each line of the output with. This helps identifying outputs + * from different printing sinks. + * @return The DataSink operator that writes the DataSet. + * + * @see #print() + */ + public DataSink<T> printOnTaskManager(String prefix) { + return output(new PrintingOutputFormat<T>(prefix, false)); + } /** - * Writes a DataSet to the standard output stream (stdout).<br/> - * For each element of the DataSet the result of {@link Object#toString()} is written. + * Writes a DataSet to the standard output stream (stdout). + * + * <p>For each element of the DataSet the result of {@link Object#toString()} is written.</p> * - * @param sinkIdentifier The string to prefix the output with. - * @return The DataSink that writes the DataSet. + * @param sinkIdentifier The string to prefix the output with. + * @return The DataSink that writes the DataSet. + * + * @deprecated Use {@link #printOnTaskManager(String)} instead. */ + @Deprecated public DataSink<T> print(String sinkIdentifier) { return output(new PrintingOutputFormat<T>(sinkIdentifier, false)); } /** - * Writes a DataSet to the standard error stream (stderr).<br/> - * For each element of the DataSet the result of {@link Object#toString()} is written. + * Writes a DataSet to the standard error stream (stderr). + * + * <p>For each element of the DataSet the result of {@link Object#toString()} is written.</p> * * @param sinkIdentifier The string to prefix the output with. * @return The DataSink that writes the DataSet. + * + * @deprecated Use {@link #printOnTaskManager(String)} instead, othe + * {@link PrintingOutputFormat} instead. */ + @Deprecated public DataSink<T> printToErr(String sinkIdentifier) { return output(new PrintingOutputFormat<T>(sinkIdentifier, true)); } http://git-wip-us.apache.org/repos/asf/flink/blob/11643c0c/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index e283e95..2ade2bc 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -1349,13 +1349,35 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { def printToErr(): Unit = { javaSet.printToErr() } + + /** + * Writes a DataSet to the standard output streams (stdout) of the TaskManagers that execute + * the program (or more specifically, the data sink operators). On a typical cluster setup, the + * data will appear in the TaskManagers' <i>.out</i> files. + * + * To print the data to the console or stdout stream of the client process instead, use the + * [[print()]] method. + * + * For each element of the DataSet the result of [[AnyRef.toString()]] is written. + * + * @param prefix The string to prefix each line of the output with. This helps identifying outputs + * from different printing sinks. + * @return The DataSink operator that writes the DataSet. + */ + def printOnTaskManager(prefix: String): DataSink[T] = { + javaSet.printOnTaskManager(prefix) + } /** * * * Writes a DataSet to the standard output stream (stdout) with a sink identifier prefixed. * This uses [[AnyRef.toString]] on each element. * @param sinkIdentifier The string to prefix the output with. + * + * @deprecated Use [[printOnTaskManager(String)]] instead. */ + @Deprecated + @deprecated def print(sinkIdentifier: String): DataSink[T] = { output(new PrintingOutputFormat[T](sinkIdentifier, false)) } @@ -1364,7 +1386,11 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { * Writes a DataSet to the standard error stream (stderr) with a sink identifier prefixed. * This uses [[AnyRef.toString]] on each element. * @param sinkIdentifier The string to prefix the output with. + * + * @deprecated Use [[printOnTaskManager(String)]] instead. */ + @Deprecated + @deprecated def printToErr(sinkIdentifier: String): DataSink[T] = { output(new PrintingOutputFormat[T](sinkIdentifier, true)) }
