pnowojski closed pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java new file mode 100644 index 00000000000..de058b69f1f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import org.apache.flink.annotation.Internal; + +import java.io.PrintStream; +import java.io.Serializable; + +/** + * Print sink output writer for DataStream and DataSet print API. + */ +@Internal +public class PrintSinkOutputWriter<IN> implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final boolean STD_OUT = false; + private static final boolean STD_ERR = true; + + private final boolean target; + private transient PrintStream stream; + private final String sinkIdentifier; + private transient String completedPrefix; + + public PrintSinkOutputWriter() { + this("", STD_OUT); + } + + public PrintSinkOutputWriter(final boolean stdErr) { + this("", stdErr); + } + + public PrintSinkOutputWriter(final String sinkIdentifier, final boolean stdErr) { + this.target = stdErr; + this.sinkIdentifier = (sinkIdentifier == null ? "" : sinkIdentifier); + } + + public void open(int subtaskIndex, int numParallelSubtasks) { + // get the target stream + stream = target == STD_OUT ? System.out : System.err; + + completedPrefix = sinkIdentifier; + + if (numParallelSubtasks > 1) { + if (!completedPrefix.isEmpty()) { + completedPrefix += ":"; + } + completedPrefix += (subtaskIndex + 1); + } + + if (!completedPrefix.isEmpty()) { + completedPrefix += "> "; + } + } + + public void write(IN record) { + stream.println(completedPrefix + record.toString()); + } + + @Override + public String toString() { + return "Print to " + (target == STD_OUT ? "System.out" : "System.err"); + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java index 0ab1abb2efb..62eabd0b739 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java @@ -19,45 +19,46 @@ package org.apache.flink.api.java.io; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.configuration.Configuration; -import java.io.PrintStream; - /** * Output format that prints results into either stdout or stderr. - * @param <T> + * + * <p> + * Four possible format options: + * {@code sinkIdentifier}:taskId> output <- {@code sinkIdentifier} provided, parallelism > 1 + * {@code sinkIdentifier}> output <- {@code sinkIdentifier} provided, parallelism == 1 + * taskId> output <- no {@code sinkIdentifier} provided, parallelism > 1 + * output <- no {@code sinkIdentifier} provided, parallelism == 1 + * </p> + * + * @param <T> Input record type */ @PublicEvolving public class PrintingOutputFormat<T> extends RichOutputFormat<T> { private static final long serialVersionUID = 1L; - private static final boolean STD_OUT = false; - private static final boolean STD_ERR = true; - - private String sinkIdentifier; - - private boolean target; - - private transient PrintStream stream; - - private transient String prefix; + private final PrintSinkOutputWriter<T> writer; // -------------------------------------------------------------------------------------------- /** * Instantiates a printing output format that prints to standard out. */ - public PrintingOutputFormat() {} + public PrintingOutputFormat() { + writer = new PrintSinkOutputWriter<>(false); + } /** * Instantiates a printing output format that prints to standard out. * * @param stdErr True, if the format should print to standard error instead of standard out. */ - public PrintingOutputFormat(boolean stdErr) { - this.target = stdErr; + public PrintingOutputFormat(final boolean stdErr) { + writer = new PrintSinkOutputWriter<>(stdErr); } /** @@ -65,17 +66,8 @@ public PrintingOutputFormat(boolean stdErr) { * @param sinkIdentifier Message that is prefixed to the output of the value. * @param stdErr True, if the format should print to standard error instead of standard out. */ - public PrintingOutputFormat(String sinkIdentifier, boolean stdErr) { - this(stdErr); - this.sinkIdentifier = sinkIdentifier; - } - - public void setTargetToStandardOut() { - this.target = STD_OUT; - } - - public void setTargetToStandardErr() { - this.target = STD_ERR; + public PrintingOutputFormat(final String sinkIdentifier, final boolean stdErr) { + writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr); } @Override @@ -83,46 +75,23 @@ public void configure(Configuration parameters) {} @Override public void open(int taskNumber, int numTasks) { - // get the target stream - this.stream = this.target == STD_OUT ? System.out : System.err; - - /** - * Four possible format options: - * sinkId:taskId> output <- sink id provided, parallelism > 1 - * sinkId> output <- sink id provided, parallelism == 1 - * taskId> output <- no sink id provided, parallelism > 1 - * output <- no sink id provided, parallelism == 1 - */ - if (this.sinkIdentifier != null) { - this.prefix = this.sinkIdentifier; - if (numTasks > 1) { - this.prefix += ":" + (taskNumber + 1); - } - this.prefix += "> "; - } else if (numTasks > 1) { - this.prefix = (taskNumber + 1) + "> "; - } else { - this.prefix = ""; - } - + writer.open(taskNumber, numTasks); } @Override public void writeRecord(T record) { - this.stream.println(this.prefix + record.toString()); + writer.write(record); } @Override public void close() { - this.stream = null; - this.prefix = null; - this.sinkIdentifier = null; + } // -------------------------------------------------------------------------------------------- @Override public String toString() { - return "Print to " + (target == STD_OUT ? "System.out" : "System.err"); + return writer.toString(); } } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java new file mode 100644 index 00000000000..0656300420f --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link PrintingOutputFormat}. + */ +public class PrintingOutputFormatTest { + + private final PrintStream originalSystemOut = System.out; + private final PrintStream originalSystemErr = System.err; + + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + private final ByteArrayOutputStream arrayErrorStream = new ByteArrayOutputStream(); + + private final String line = System.lineSeparator(); + + @Before + public void setUp() { + System.setOut(new PrintStream(arrayOutputStream)); + System.setErr(new PrintStream(arrayErrorStream)); + } + + @After + public void tearDown() { + if (System.out != originalSystemOut) { + System.out.close(); + } + if (System.err != originalSystemErr) { + System.err.close(); + } + System.setOut(originalSystemOut); + System.setErr(originalSystemErr); + } + + @Test + public void testPrintOutputFormatStdOut() throws Exception { + PrintingOutputFormat<String> printSink = new PrintingOutputFormat<>(); + printSink.open(0, 1); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("hello world!" + line, arrayOutputStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatStdErr() throws Exception { + PrintingOutputFormat<String> printSink = new PrintingOutputFormat<>(true); + printSink.open(0, 1); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.err", printSink.toString()); + assertEquals("hello world!" + line, arrayErrorStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatWithPrefix() throws Exception { + PrintingOutputFormat<String> printSink = new PrintingOutputFormat<>(); + printSink.open(1, 2); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("2> hello world!" + line, arrayOutputStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatWithIdentifierAndPrefix() throws Exception { + PrintingOutputFormat<String> printSink = new PrintingOutputFormat<>("mySink", false); + printSink.open(1, 2); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, arrayOutputStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatWithIdentifierButNoPrefix() throws Exception { + PrintingOutputFormat<String> printSink = new PrintingOutputFormat<>("mySink", false); + printSink.open(0, 1); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("mySink> hello world!" + line, arrayOutputStream.toString()); + printSink.close(); + } + +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 8e24ad73558..271d9bec33f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -973,6 +973,40 @@ public ExecutionConfig getExecutionConfig() { return addSink(printFunction).name("Print to Std. Err"); } + /** + * Writes a DataStream to the standard output stream (stdout). + * + * <p>For each element of the DataStream the result of {@link Object#toString()} is written. + * + * <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink + * worker. + * + * @param sinkIdentifier The string to prefix the output with. + * @return The closed DataStream. + */ + @PublicEvolving + public DataStreamSink<T> print(String sinkIdentifier) { + PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false); + return addSink(printFunction).name("Print to Std. Out"); + } + + /** + * Writes a DataStream to the standard output stream (stderr). + * + * <p>For each element of the DataStream the result of {@link Object#toString()} is written. + * + * <p>NOTE: This will print to stderr on the machine where the code is executed, i.e. the Flink + * worker. + * + * @param sinkIdentifier The string to prefix the output with. + * @return The closed DataStream. + */ + @PublicEvolving + public DataStreamSink<T> printToErr(String sinkIdentifier) { + PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, true); + return addSink(printFunction).name("Print to Std. Err"); + } + /** * Writes a DataStream to the file specified by path in text format. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java index e646663ce6c..f0ced9e6d6d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java @@ -18,81 +18,71 @@ package org.apache.flink.streaming.api.functions.sink; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import java.io.PrintStream; - /** * Implementation of the SinkFunction writing every tuple to the standard * output or standard error stream. * - * @param <IN> - * Input record type + * <p> + * Four possible format options: + * {@code sinkIdentifier}:taskId> output <- {@code sinkIdentifier} provided, parallelism > 1 + * {@code sinkIdentifier}> output <- {@code sinkIdentifier} provided, parallelism == 1 + * taskId> output <- no {@code sinkIdentifier} provided, parallelism > 1 + * output <- no {@code sinkIdentifier} provided, parallelism == 1 + * </p> + * + * @param <IN> Input record type */ @PublicEvolving public class PrintSinkFunction<IN> extends RichSinkFunction<IN> { - private static final long serialVersionUID = 1L; - private static final boolean STD_OUT = false; - private static final boolean STD_ERR = true; + private static final long serialVersionUID = 1L; - private boolean target; - private transient PrintStream stream; - private transient String prefix; + private final PrintSinkOutputWriter<IN> writer; /** * Instantiates a print sink function that prints to standard out. */ - public PrintSinkFunction() {} + public PrintSinkFunction() { + writer = new PrintSinkOutputWriter<>(false); + } /** * Instantiates a print sink function that prints to standard out. * * @param stdErr True, if the format should print to standard error instead of standard out. */ - public PrintSinkFunction(boolean stdErr) { - target = stdErr; - } - - public void setTargetToStandardOut() { - target = STD_OUT; + public PrintSinkFunction(final boolean stdErr) { + writer = new PrintSinkOutputWriter<>(stdErr); } - public void setTargetToStandardErr() { - target = STD_ERR; + /** + * Instantiates a print sink function that prints to standard out and gives a sink identifier. + * + * @param stdErr True, if the format should print to standard error instead of standard out. + * @param sinkIdentifier Message that identify sink and is prefixed to the output of the value + */ + public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) { + writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); - // get the target stream - stream = target == STD_OUT ? System.out : System.err; - - // set the prefix if we have a >1 parallelism - prefix = (context.getNumberOfParallelSubtasks() > 1) ? - ((context.getIndexOfThisSubtask() + 1) + "> ") : null; + writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks()); } @Override public void invoke(IN record) { - if (prefix != null) { - stream.println(prefix + record.toString()); - } - else { - stream.println(record.toString()); - } - } - - @Override - public void close() { - this.stream = null; - this.prefix = null; + writer.write(record); } @Override public String toString() { - return "Print to " + (target == STD_OUT ? "System.out" : "System.err"); + return writer.toString(); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 4d2d6e1f6a5..aa6774dab8f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -971,6 +971,10 @@ public boolean filter(Integer value) throws Exception { StreamEdge splitEdge = env.getStreamGraph().getStreamEdges(unionFilter.getId(), sink.getTransformation().getId()).get(0); assertEquals("a", splitEdge.getSelectedNames().get(0)); + DataStreamSink<Integer> sinkWithIdentifier = select.print("identifier"); + StreamEdge newSplitEdge = env.getStreamGraph().getStreamEdges(unionFilter.getId(), sinkWithIdentifier.getTransformation().getId()).get(0); + assertEquals("a", newSplitEdge.getSelectedNames().get(0)); + ConnectedStreams<Integer, Integer> connect = map.connect(flatMap); CoMapFunction<Integer, Integer, String> coMapper = new CoMapFunction<Integer, Integer, String>() { private static final long serialVersionUID = 1L; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java index 4a163387b9e..bef044022dc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java @@ -66,7 +66,7 @@ public void tearDown() { public void testPrintSinkStdOut() throws Exception { PrintSinkFunction<String> printSink = new PrintSinkFunction<>(); printSink.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 0)); - printSink.setTargetToStandardOut(); + printSink.open(new Configuration()); printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); @@ -78,9 +78,8 @@ public void testPrintSinkStdOut() throws Exception { @Test public void testPrintSinkStdErr() throws Exception { - PrintSinkFunction<String> printSink = new PrintSinkFunction<>(); + PrintSinkFunction<String> printSink = new PrintSinkFunction<>(true); printSink.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 0)); - printSink.setTargetToStandardErr(); printSink.open(new Configuration()); printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); @@ -94,7 +93,6 @@ public void testPrintSinkStdErr() throws Exception { public void testPrintSinkWithPrefix() throws Exception { PrintSinkFunction<String> printSink = new PrintSinkFunction<>(); printSink.setRuntimeContext(new MockStreamingRuntimeContext(false, 2, 1)); - printSink.setTargetToStandardOut(); printSink.open(new Configuration()); printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); @@ -103,4 +101,31 @@ public void testPrintSinkWithPrefix() throws Exception { assertEquals("2> hello world!" + line, arrayOutputStream.toString()); printSink.close(); } + + @Test + public void testPrintSinkWithIdentifierAndPrefix() throws Exception { + PrintSinkFunction<String> printSink = new PrintSinkFunction<>("mySink", false); + printSink.setRuntimeContext(new MockStreamingRuntimeContext(false, 2, 1)); + printSink.open(new Configuration()); + + printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, arrayOutputStream.toString()); + printSink.close(); + } + + @Test + public void testPrintSinkWithIdentifierButNoPrefix() throws Exception { + PrintSinkFunction<String> printSink = new PrintSinkFunction<>("mySink", false); + printSink.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 0)); + printSink.open(new Configuration()); + + printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("mySink> hello world!" + line, arrayOutputStream.toString()); + printSink.close(); + } + } diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 3a888299111..23260cca787 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -959,6 +959,29 @@ class DataStream[T](stream: JavaStream[T]) { @PublicEvolving def printToErr() = stream.printToErr() + /** + * Writes a DataStream to the standard output stream (stdout). For each + * element of the DataStream the result of [[AnyRef.toString()]] is + * written. + * + * @param sinkIdentifier The string to prefix the output with. + * @return The closed DataStream. + */ + @PublicEvolving + def print(sinkIdentifier: String): DataStreamSink[T] = stream.print(sinkIdentifier) + + /** + * Writes a DataStream to the standard output stream (stderr). + * + * For each element of the DataStream the result of + * [[AnyRef.toString()]] is written. + * + * @param sinkIdentifier The string to prefix the output with. + * @return The closed DataStream. + */ + @PublicEvolving + def printToErr(sinkIdentifier: String) = stream.printToErr(sinkIdentifier) + /** * Writes a DataStream to the file specified by path in text format. For * every element of the DataStream the result of .toString is written. diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index 9e1c4939365..7b7dde5edae 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -579,6 +579,12 @@ class DataStreamTest extends AbstractTestBase { env.getStreamGraph.getStreamEdges(unionFilter.getId, sink.getTransformation.getId) assert("a" == splitEdge.get(0).getSelectedNames.get(0)) + val sinkWithIdentifier = select.print("identifier") + val newSplitEdge = env.getStreamGraph.getStreamEdges( + unionFilter.getId, + sinkWithIdentifier.getTransformation.getId) + assert("a" == newSplitEdge.get(0).getSelectedNames.get(0)) + val foldFunction = new FoldFunction[Int, String] { override def fold(accumulator: String, value: Int): String = "" } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
