This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cce82d56c311d8402b7a2cd14c6f9149a2650832 Author: sjwiesman <[email protected]> AuthorDate: Mon Nov 15 14:48:56 2021 -0600 [FLINK-24635][examples] Fix deprecations in socket example --- .../examples/socket/SocketWindowWordCount.java | 30 ++++++++-------------- .../examples/socket/SocketWindowWordCount.scala | 2 +- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java index 69d03f8..76fb37f 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java @@ -19,13 +19,12 @@ package org.apache.flink.streaming.examples.socket; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.util.Collector; /** * Implements a streaming windowed version of the "WordCount" program. @@ -39,7 +38,6 @@ import org.apache.flink.util.Collector; * * <p>and run this example with the hostname and the port as arguments. */ -@SuppressWarnings("serial") public class SocketWindowWordCount { public static void main(String[] args) throws Exception { @@ -71,24 +69,17 @@ public class SocketWindowWordCount { // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text.flatMap( - new FlatMapFunction<String, WordWithCount>() { - @Override - public void flatMap( - String value, Collector<WordWithCount> out) { - for (String word : value.split("\\s")) { - out.collect(new WordWithCount(word, 1L)); - } - } - }) + (FlatMapFunction<String, WordWithCount>) + (value, out) -> { + for (String word : value.split("\\s")) { + out.collect(new WordWithCount(word, 1L)); + } + }, + Types.POJO(WordWithCount.class)) .keyBy(value -> value.word) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) - .reduce( - new ReduceFunction<WordWithCount>() { - @Override - public WordWithCount reduce(WordWithCount a, WordWithCount b) { - return new WordWithCount(a.word, a.count + b.count); - } - }); + .reduce((a, b) -> new WordWithCount(a.word, a.count + b.count)) + .returns(WordWithCount.class); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); @@ -104,6 +95,7 @@ public class SocketWindowWordCount { public String word; public long count; + @SuppressWarnings("unused") public WordWithCount() {} public WordWithCount(String word, long count) { diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala index bab6e6f..fbb98ed 100644 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala @@ -73,7 +73,7 @@ object SocketWindowWordCount { .sum("count") // print the results with a single thread, rather than in parallel - windowCounts.print().setParallelism(1) + windowCounts.print() env.execute("Socket Window WordCount") }
