This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1eee012e13db8c0a8fd60d8d1d4ef3f312f8225b Author: sjwiesman <[email protected]> AuthorDate: Mon Nov 15 14:50:22 2021 -0600 [FLINK-24635][examples] Fix deprecations in side output example --- .../examples/sideoutput/SideOutputExample.java | 57 +++++++++++++--------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java index 48af9f2..e72804b 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java @@ -23,20 +23,26 @@ import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; import org.apache.flink.api.common.eventtime.WatermarkGenerator; import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.examples.wordcount.util.WordCountData; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; +import java.time.Duration; + /** * An example that illustrates the use of side output. * @@ -81,29 +87,12 @@ public class SideOutputExample { text.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()); SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized = - text.keyBy( - new KeySelector<String, Integer>() { - private static final long serialVersionUID = 1L; - - @Override - public Integer getKey(String value) throws Exception { - return 0; - } - }) - .process(new Tokenizer()); + text.process(new Tokenizer()); DataStream<String> rejectedWords = tokenized .getSideOutput(rejectedWordsTag) - .map( - new MapFunction<String, String>() { - private static final long serialVersionUID = 1L; - - @Override - public String map(String value) throws Exception { - return "rejected: " + value; - } - }); + .map(value -> "rejected: " + value, Types.STRING); DataStream<Tuple2<String, Integer>> counts = tokenized @@ -114,8 +103,30 @@ public class SideOutputExample { // emit result if (params.has("output")) { - counts.writeAsText(params.get("output")); - rejectedWords.writeAsText(params.get("rejected-words-output")); + counts.sinkTo( + FileSink.<Tuple2<String, Integer>>forRowFormat( + new Path(params.get("output")), + new SimpleStringEncoder<>()) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withMaxPartSize(MemorySize.ofMebiBytes(1)) + .withRolloverInterval(Duration.ofSeconds(10)) + .build()) + .build()) + .name("output"); + + rejectedWords + .sinkTo( + FileSink.<String>forRowFormat( + new Path(params.get("rejected-words-output")), + new SimpleStringEncoder<>()) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withMaxPartSize(MemorySize.ofMebiBytes(1)) + .withRolloverInterval(Duration.ofSeconds(10)) + .build()) + .build()) + .name("rejected-words-output"); } else { System.out.println("Printing result to stdout. Use --output to specify output path."); counts.print();
