This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cce82d56c311d8402b7a2cd14c6f9149a2650832
Author: sjwiesman <[email protected]>
AuthorDate: Mon Nov 15 14:48:56 2021 -0600

    [FLINK-24635][examples] Fix deprecations in socket example
---
 .../examples/socket/SocketWindowWordCount.java     | 30 ++++++++--------------
 .../examples/socket/SocketWindowWordCount.scala    |  2 +-
 2 files changed, 12 insertions(+), 20 deletions(-)

diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
index 69d03f8..76fb37f 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
@@ -19,13 +19,12 @@
 package org.apache.flink.streaming.examples.socket;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Collector;
 
 /**
  * Implements a streaming windowed version of the "WordCount" program.
@@ -39,7 +38,6 @@ import org.apache.flink.util.Collector;
  *
  * <p>and run this example with the hostname and the port as arguments.
  */
-@SuppressWarnings("serial")
 public class SocketWindowWordCount {
 
     public static void main(String[] args) throws Exception {
@@ -71,24 +69,17 @@ public class SocketWindowWordCount {
         // parse the data, group it, window it, and aggregate the counts
         DataStream<WordWithCount> windowCounts =
                 text.flatMap(
-                                new FlatMapFunction<String, WordWithCount>() {
-                                    @Override
-                                    public void flatMap(
-                                            String value, 
Collector<WordWithCount> out) {
-                                        for (String word : value.split("\\s")) 
{
-                                            out.collect(new 
WordWithCount(word, 1L));
-                                        }
-                                    }
-                                })
+                                (FlatMapFunction<String, WordWithCount>)
+                                        (value, out) -> {
+                                            for (String word : 
value.split("\\s")) {
+                                                out.collect(new 
WordWithCount(word, 1L));
+                                            }
+                                        },
+                                Types.POJO(WordWithCount.class))
                         .keyBy(value -> value.word)
                         
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
-                        .reduce(
-                                new ReduceFunction<WordWithCount>() {
-                                    @Override
-                                    public WordWithCount reduce(WordWithCount 
a, WordWithCount b) {
-                                        return new WordWithCount(a.word, 
a.count + b.count);
-                                    }
-                                });
+                        .reduce((a, b) -> new WordWithCount(a.word, a.count + 
b.count))
+                        .returns(WordWithCount.class);
 
         // print the results with a single thread, rather than in parallel
         windowCounts.print().setParallelism(1);
@@ -104,6 +95,7 @@ public class SocketWindowWordCount {
         public String word;
         public long count;
 
+        @SuppressWarnings("unused")
         public WordWithCount() {}
 
         public WordWithCount(String word, long count) {
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
index bab6e6f..fbb98ed 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
+++ 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
@@ -73,7 +73,7 @@ object SocketWindowWordCount {
           .sum("count")
 
     // print the results with a single thread, rather than in parallel
-    windowCounts.print().setParallelism(1)
+    windowCounts.print()
 
     env.execute("Socket Window WordCount")
   }

Reply via email to