This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1eee012e13db8c0a8fd60d8d1d4ef3f312f8225b
Author: sjwiesman <[email protected]>
AuthorDate: Mon Nov 15 14:50:22 2021 -0600

    [FLINK-24635][examples] Fix deprecations in side output example
---
 .../examples/sideoutput/SideOutputExample.java     | 57 +++++++++++++---------
 1 file changed, 34 insertions(+), 23 deletions(-)

diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
index 48af9f2..e72804b 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
@@ -23,20 +23,26 @@ import 
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
 import org.apache.flink.api.common.eventtime.WatermarkGenerator;
 import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
+import java.time.Duration;
+
 /**
  * An example that illustrates the use of side output.
  *
@@ -81,29 +87,12 @@ public class SideOutputExample {
         
text.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());
 
         SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized =
-                text.keyBy(
-                                new KeySelector<String, Integer>() {
-                                    private static final long serialVersionUID 
= 1L;
-
-                                    @Override
-                                    public Integer getKey(String value) throws 
Exception {
-                                        return 0;
-                                    }
-                                })
-                        .process(new Tokenizer());
+                text.process(new Tokenizer());
 
         DataStream<String> rejectedWords =
                 tokenized
                         .getSideOutput(rejectedWordsTag)
-                        .map(
-                                new MapFunction<String, String>() {
-                                    private static final long serialVersionUID 
= 1L;
-
-                                    @Override
-                                    public String map(String value) throws 
Exception {
-                                        return "rejected: " + value;
-                                    }
-                                });
+                        .map(value -> "rejected: " + value, Types.STRING);
 
         DataStream<Tuple2<String, Integer>> counts =
                 tokenized
@@ -114,8 +103,30 @@ public class SideOutputExample {
 
         // emit result
         if (params.has("output")) {
-            counts.writeAsText(params.get("output"));
-            rejectedWords.writeAsText(params.get("rejected-words-output"));
+            counts.sinkTo(
+                            FileSink.<Tuple2<String, Integer>>forRowFormat(
+                                            new Path(params.get("output")),
+                                            new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    
.withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    
.withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .name("output");
+
+            rejectedWords
+                    .sinkTo(
+                            FileSink.<String>forRowFormat(
+                                            new 
Path(params.get("rejected-words-output")),
+                                            new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    
.withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    
.withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .name("rejected-words-output");
         } else {
             System.out.println("Printing result to stdout. Use --output to 
specify output path.");
             counts.print();

Reply via email to