pnowojski closed pull request #6367: [FLINK-9850] Add a string to the print 
method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java
new file mode 100644
index 00000000000..de058b69f1f
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+
+/**
+ * Print sink output writer for DataStream and DataSet print API.
+ */
+@Internal
+public class PrintSinkOutputWriter<IN> implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final boolean STD_OUT = false;
+       private static final boolean STD_ERR = true;
+
+       private final boolean target;
+       private transient PrintStream stream;
+       private final String sinkIdentifier;
+       private transient String completedPrefix;
+
+       public PrintSinkOutputWriter() {
+               this("", STD_OUT);
+       }
+
+       public PrintSinkOutputWriter(final boolean stdErr) {
+               this("", stdErr);
+       }
+
+       public PrintSinkOutputWriter(final String sinkIdentifier, final boolean 
stdErr) {
+               this.target = stdErr;
+               this.sinkIdentifier = (sinkIdentifier == null ? "" : 
sinkIdentifier);
+       }
+
+       public void open(int subtaskIndex, int numParallelSubtasks) {
+               // get the target stream
+               stream = target == STD_OUT ? System.out : System.err;
+
+               completedPrefix = sinkIdentifier;
+
+               if (numParallelSubtasks > 1) {
+                       if (!completedPrefix.isEmpty()) {
+                               completedPrefix += ":";
+                       }
+                       completedPrefix += (subtaskIndex + 1);
+               }
+
+               if (!completedPrefix.isEmpty()) {
+                       completedPrefix += "> ";
+               }
+       }
+
+       public void write(IN record) {
+               stream.println(completedPrefix + record.toString());
+       }
+
+       @Override
+       public String toString() {
+               return "Print to " + (target == STD_OUT ? "System.out" : 
"System.err");
+       }
+}
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
index 0ab1abb2efb..62eabd0b739 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
@@ -19,45 +19,46 @@
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
 
-import java.io.PrintStream;
-
 /**
  * Output format that prints results into either stdout or stderr.
- * @param <T>
+ *
+ * <p>
+ * Four possible format options:
+ *     {@code sinkIdentifier}:taskId> output  <- {@code sinkIdentifier} 
provided, parallelism > 1
+ *     {@code sinkIdentifier}> output         <- {@code sinkIdentifier} 
provided, parallelism == 1
+ *  taskId> output                                        <- no {@code 
sinkIdentifier} provided, parallelism > 1
+ *  output                                                <- no {@code 
sinkIdentifier} provided, parallelism == 1
+ * </p>
+ *
+ * @param <T> Input record type
  */
 @PublicEvolving
 public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
 
        private static final long serialVersionUID = 1L;
 
-       private static final boolean STD_OUT = false;
-       private static final boolean STD_ERR = true;
-
-       private String sinkIdentifier;
-
-       private boolean target;
-
-       private transient PrintStream stream;
-
-       private transient String prefix;
+       private final PrintSinkOutputWriter<T> writer;
 
        // 
--------------------------------------------------------------------------------------------
 
        /**
         * Instantiates a printing output format that prints to standard out.
         */
-       public PrintingOutputFormat() {}
+       public PrintingOutputFormat() {
+               writer = new PrintSinkOutputWriter<>(false);
+       }
 
        /**
         * Instantiates a printing output format that prints to standard out.
         *
         * @param stdErr True, if the format should print to standard error 
instead of standard out.
         */
-       public PrintingOutputFormat(boolean stdErr) {
-               this.target = stdErr;
+       public PrintingOutputFormat(final boolean stdErr) {
+               writer = new PrintSinkOutputWriter<>(stdErr);
        }
 
        /**
@@ -65,17 +66,8 @@ public PrintingOutputFormat(boolean stdErr) {
         * @param sinkIdentifier Message that is prefixed to the output of the 
value.
         * @param stdErr True, if the format should print to standard error 
instead of standard out.
         */
-       public PrintingOutputFormat(String sinkIdentifier, boolean stdErr) {
-               this(stdErr);
-               this.sinkIdentifier = sinkIdentifier;
-       }
-
-       public void setTargetToStandardOut() {
-               this.target = STD_OUT;
-       }
-
-       public void setTargetToStandardErr() {
-               this.target = STD_ERR;
+       public PrintingOutputFormat(final String sinkIdentifier, final boolean 
stdErr) {
+               writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
        }
 
        @Override
@@ -83,46 +75,23 @@ public void configure(Configuration parameters) {}
 
        @Override
        public void open(int taskNumber, int numTasks) {
-               // get the target stream
-               this.stream = this.target == STD_OUT ? System.out : System.err;
-
-               /**
-                * Four possible format options:
-                *      sinkId:taskId> output  <- sink id provided, parallelism 
> 1
-                *      sinkId> output         <- sink id provided, parallelism 
== 1
-                *      taskId> output         <- no sink id provided, 
parallelism > 1
-                *      output                 <- no sink id provided, 
parallelism == 1
-                */
-               if (this.sinkIdentifier != null) {
-                       this.prefix = this.sinkIdentifier;
-                       if (numTasks > 1) {
-                               this.prefix += ":" + (taskNumber + 1);
-                       }
-                       this.prefix += "> ";
-               } else if (numTasks > 1) {
-                       this.prefix = (taskNumber + 1) + "> ";
-               } else {
-                       this.prefix = "";
-               }
-
+               writer.open(taskNumber, numTasks);
        }
 
        @Override
        public void writeRecord(T record) {
-               this.stream.println(this.prefix + record.toString());
+               writer.write(record);
        }
 
        @Override
        public void close() {
-               this.stream = null;
-               this.prefix = null;
-               this.sinkIdentifier = null;
+
        }
 
        // 
--------------------------------------------------------------------------------------------
 
        @Override
        public String toString() {
-               return "Print to " + (target == STD_OUT ? "System.out" : 
"System.err");
+               return writer.toString();
        }
 }
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java
new file mode 100644
index 00000000000..0656300420f
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link PrintingOutputFormat}.
+ */
+public class PrintingOutputFormatTest {
+
+       private final PrintStream originalSystemOut = System.out;
+       private final PrintStream originalSystemErr = System.err;
+
+       private final ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
+       private final ByteArrayOutputStream arrayErrorStream = new 
ByteArrayOutputStream();
+
+       private final String line = System.lineSeparator();
+
+       @Before
+       public void setUp() {
+               System.setOut(new PrintStream(arrayOutputStream));
+               System.setErr(new PrintStream(arrayErrorStream));
+       }
+
+       @After
+       public void tearDown() {
+               if (System.out != originalSystemOut) {
+                       System.out.close();
+               }
+               if (System.err != originalSystemErr) {
+                       System.err.close();
+               }
+               System.setOut(originalSystemOut);
+               System.setErr(originalSystemErr);
+       }
+
+       @Test
+       public void testPrintOutputFormatStdOut() throws Exception {
+               PrintingOutputFormat<String> printSink = new 
PrintingOutputFormat<>();
+               printSink.open(0, 1);
+
+               printSink.writeRecord("hello world!");
+
+               assertEquals("Print to System.out", printSink.toString());
+               assertEquals("hello world!" + line, 
arrayOutputStream.toString());
+               printSink.close();
+       }
+
+       @Test
+       public void testPrintOutputFormatStdErr() throws Exception {
+               PrintingOutputFormat<String> printSink = new 
PrintingOutputFormat<>(true);
+               printSink.open(0, 1);
+
+               printSink.writeRecord("hello world!");
+
+               assertEquals("Print to System.err", printSink.toString());
+               assertEquals("hello world!" + line, 
arrayErrorStream.toString());
+               printSink.close();
+       }
+
+       @Test
+       public void testPrintOutputFormatWithPrefix() throws Exception {
+               PrintingOutputFormat<String> printSink = new 
PrintingOutputFormat<>();
+               printSink.open(1, 2);
+
+               printSink.writeRecord("hello world!");
+
+               assertEquals("Print to System.out", printSink.toString());
+               assertEquals("2> hello world!" + line, 
arrayOutputStream.toString());
+               printSink.close();
+       }
+
+       @Test
+       public void testPrintOutputFormatWithIdentifierAndPrefix() throws 
Exception {
+               PrintingOutputFormat<String> printSink = new 
PrintingOutputFormat<>("mySink", false);
+               printSink.open(1, 2);
+
+               printSink.writeRecord("hello world!");
+
+               assertEquals("Print to System.out", printSink.toString());
+               assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+               printSink.close();
+       }
+
+       @Test
+       public void testPrintOutputFormatWithIdentifierButNoPrefix() throws 
Exception {
+               PrintingOutputFormat<String> printSink = new 
PrintingOutputFormat<>("mySink", false);
+               printSink.open(0, 1);
+
+               printSink.writeRecord("hello world!");
+
+               assertEquals("Print to System.out", printSink.toString());
+               assertEquals("mySink> hello world!" + line, 
arrayOutputStream.toString());
+               printSink.close();
+       }
+
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 8e24ad73558..271d9bec33f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -973,6 +973,40 @@ public ExecutionConfig getExecutionConfig() {
                return addSink(printFunction).name("Print to Std. Err");
        }
 
+       /**
+        * Writes a DataStream to the standard output stream (stdout).
+        *
+        * <p>For each element of the DataStream the result of {@link 
Object#toString()} is written.
+        *
+        * <p>NOTE: This will print to stdout on the machine where the code is 
executed, i.e. the Flink
+        * worker.
+        *
+        * @param sinkIdentifier The string to prefix the output with.
+        * @return The closed DataStream.
+        */
+       @PublicEvolving
+       public DataStreamSink<T> print(String sinkIdentifier) {
+               PrintSinkFunction<T> printFunction = new 
PrintSinkFunction<>(sinkIdentifier, false);
+               return addSink(printFunction).name("Print to Std. Out");
+       }
+
+       /**
+        * Writes a DataStream to the standard output stream (stderr).
+        *
+        * <p>For each element of the DataStream the result of {@link 
Object#toString()} is written.
+        *
+        * <p>NOTE: This will print to stderr on the machine where the code is 
executed, i.e. the Flink
+        * worker.
+        *
+        * @param sinkIdentifier The string to prefix the output with.
+        * @return The closed DataStream.
+        */
+       @PublicEvolving
+       public DataStreamSink<T> printToErr(String sinkIdentifier) {
+               PrintSinkFunction<T> printFunction = new 
PrintSinkFunction<>(sinkIdentifier, true);
+               return addSink(printFunction).name("Print to Std. Err");
+       }
+
        /**
         * Writes a DataStream to the file specified by path in text format.
         *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
index e646663ce6c..f0ced9e6d6d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
@@ -18,81 +18,71 @@
 package org.apache.flink.streaming.api.functions.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
-import java.io.PrintStream;
-
 /**
  * Implementation of the SinkFunction writing every tuple to the standard
  * output or standard error stream.
  *
- * @param <IN>
- *            Input record type
+ * <p>
+ * Four possible format options:
+ *     {@code sinkIdentifier}:taskId> output  <- {@code sinkIdentifier} 
provided, parallelism > 1
+ *     {@code sinkIdentifier}> output         <- {@code sinkIdentifier} 
provided, parallelism == 1
+ *  taskId> output                                        <- no {@code 
sinkIdentifier} provided, parallelism > 1
+ *  output                                                <- no {@code 
sinkIdentifier} provided, parallelism == 1
+ * </p>
+ *
+ * @param <IN> Input record type
  */
 @PublicEvolving
 public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
-       private static final long serialVersionUID = 1L;
 
-       private static final boolean STD_OUT = false;
-       private static final boolean STD_ERR = true;
+       private static final long serialVersionUID = 1L;
 
-       private boolean target;
-       private transient PrintStream stream;
-       private transient String prefix;
+       private final PrintSinkOutputWriter<IN> writer;
 
        /**
         * Instantiates a print sink function that prints to standard out.
         */
-       public PrintSinkFunction() {}
+       public PrintSinkFunction() {
+               writer = new PrintSinkOutputWriter<>(false);
+       }
 
        /**
         * Instantiates a print sink function that prints to standard out.
         *
         * @param stdErr True, if the format should print to standard error 
instead of standard out.
         */
-       public PrintSinkFunction(boolean stdErr) {
-               target = stdErr;
-       }
-
-       public void setTargetToStandardOut() {
-               target = STD_OUT;
+       public PrintSinkFunction(final boolean stdErr) {
+               writer = new PrintSinkOutputWriter<>(stdErr);
        }
 
-       public void setTargetToStandardErr() {
-               target = STD_ERR;
+       /**
+        * Instantiates a print sink function that prints to standard out and 
gives a sink identifier.
+        *
+        * @param stdErr True, if the format should print to standard error 
instead of standard out.
+        * @param sinkIdentifier Message that identify sink and is prefixed to 
the output of the value
+        */
+       public PrintSinkFunction(final String sinkIdentifier, final boolean 
stdErr) {
+               writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
        }
 
        @Override
        public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                StreamingRuntimeContext context = (StreamingRuntimeContext) 
getRuntimeContext();
-               // get the target stream
-               stream = target == STD_OUT ? System.out : System.err;
-
-               // set the prefix if we have a >1 parallelism
-               prefix = (context.getNumberOfParallelSubtasks() > 1) ?
-                               ((context.getIndexOfThisSubtask() + 1) + "> ") 
: null;
+               writer.open(context.getIndexOfThisSubtask(), 
context.getNumberOfParallelSubtasks());
        }
 
        @Override
        public void invoke(IN record) {
-               if (prefix != null) {
-                       stream.println(prefix + record.toString());
-               }
-               else {
-                       stream.println(record.toString());
-               }
-       }
-
-       @Override
-       public void close() {
-               this.stream = null;
-               this.prefix = null;
+               writer.write(record);
        }
 
        @Override
        public String toString() {
-               return "Print to " + (target == STD_OUT ? "System.out" : 
"System.err");
+               return writer.toString();
        }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 4d2d6e1f6a5..aa6774dab8f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -971,6 +971,10 @@ public boolean filter(Integer value) throws Exception {
                StreamEdge splitEdge = 
env.getStreamGraph().getStreamEdges(unionFilter.getId(), 
sink.getTransformation().getId()).get(0);
                assertEquals("a", splitEdge.getSelectedNames().get(0));
 
+               DataStreamSink<Integer> sinkWithIdentifier = 
select.print("identifier");
+               StreamEdge newSplitEdge = 
env.getStreamGraph().getStreamEdges(unionFilter.getId(), 
sinkWithIdentifier.getTransformation().getId()).get(0);
+               assertEquals("a", newSplitEdge.getSelectedNames().get(0));
+
                ConnectedStreams<Integer, Integer> connect = 
map.connect(flatMap);
                CoMapFunction<Integer, Integer, String> coMapper = new 
CoMapFunction<Integer, Integer, String>() {
                        private static final long serialVersionUID = 1L;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
index 4a163387b9e..bef044022dc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
@@ -66,7 +66,7 @@ public void tearDown() {
        public void testPrintSinkStdOut() throws Exception {
                PrintSinkFunction<String> printSink = new PrintSinkFunction<>();
                printSink.setRuntimeContext(new 
MockStreamingRuntimeContext(false, 1, 0));
-               printSink.setTargetToStandardOut();
+
                printSink.open(new Configuration());
 
                printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
@@ -78,9 +78,8 @@ public void testPrintSinkStdOut() throws Exception {
 
        @Test
        public void testPrintSinkStdErr() throws Exception {
-               PrintSinkFunction<String> printSink = new PrintSinkFunction<>();
+               PrintSinkFunction<String> printSink = new 
PrintSinkFunction<>(true);
                printSink.setRuntimeContext(new 
MockStreamingRuntimeContext(false, 1, 0));
-               printSink.setTargetToStandardErr();
                printSink.open(new Configuration());
 
                printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
@@ -94,7 +93,6 @@ public void testPrintSinkStdErr() throws Exception {
        public void testPrintSinkWithPrefix() throws Exception {
                PrintSinkFunction<String> printSink = new PrintSinkFunction<>();
                printSink.setRuntimeContext(new 
MockStreamingRuntimeContext(false, 2, 1));
-               printSink.setTargetToStandardOut();
                printSink.open(new Configuration());
 
                printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
@@ -103,4 +101,31 @@ public void testPrintSinkWithPrefix() throws Exception {
                assertEquals("2> hello world!" + line, 
arrayOutputStream.toString());
                printSink.close();
        }
+
+       @Test
+       public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+               PrintSinkFunction<String> printSink = new 
PrintSinkFunction<>("mySink", false);
+               printSink.setRuntimeContext(new 
MockStreamingRuntimeContext(false, 2, 1));
+               printSink.open(new Configuration());
+
+               printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
+
+               assertEquals("Print to System.out", printSink.toString());
+               assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+               printSink.close();
+       }
+
+       @Test
+       public void testPrintSinkWithIdentifierButNoPrefix() throws Exception {
+               PrintSinkFunction<String> printSink = new 
PrintSinkFunction<>("mySink", false);
+               printSink.setRuntimeContext(new 
MockStreamingRuntimeContext(false, 1, 0));
+               printSink.open(new Configuration());
+
+               printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
+
+               assertEquals("Print to System.out", printSink.toString());
+               assertEquals("mySink> hello world!" + line, 
arrayOutputStream.toString());
+               printSink.close();
+       }
+
 }
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 3a888299111..23260cca787 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -959,6 +959,29 @@ class DataStream[T](stream: JavaStream[T]) {
   @PublicEvolving
   def printToErr() = stream.printToErr()
 
+  /**
+    * Writes a DataStream to the standard output stream (stdout). For each
+    * element of the DataStream the result of [[AnyRef.toString()]] is
+    * written.
+    *
+    * @param sinkIdentifier The string to prefix the output with.
+    * @return The closed DataStream.
+    */
+  @PublicEvolving
+  def print(sinkIdentifier: String): DataStreamSink[T] = 
stream.print(sinkIdentifier)
+
+  /**
+    * Writes a DataStream to the standard output stream (stderr).
+    *
+    * For each element of the DataStream the result of
+    * [[AnyRef.toString()]] is written.
+    *
+    * @param sinkIdentifier The string to prefix the output with.
+    * @return The closed DataStream.
+    */
+  @PublicEvolving
+  def printToErr(sinkIdentifier: String) = stream.printToErr(sinkIdentifier)
+
   /**
     * Writes a DataStream to the file specified by path in text format. For
     * every element of the DataStream the result of .toString is written.
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 9e1c4939365..7b7dde5edae 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -579,6 +579,12 @@ class DataStreamTest extends AbstractTestBase {
       env.getStreamGraph.getStreamEdges(unionFilter.getId, 
sink.getTransformation.getId)
     assert("a" == splitEdge.get(0).getSelectedNames.get(0))
 
+    val sinkWithIdentifier = select.print("identifier")
+    val newSplitEdge = env.getStreamGraph.getStreamEdges(
+      unionFilter.getId,
+      sinkWithIdentifier.getTransformation.getId)
+    assert("a" == newSplitEdge.get(0).getSelectedNames.get(0))
+
     val foldFunction = new FoldFunction[Int, String] {
       override def fold(accumulator: String, value: Int): String = ""
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to