This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0e3273d198ed0f69ffda44dd15daf0290a9a81c9
Author: sjwiesman <[email protected]>
AuthorDate: Mon Nov 15 14:52:59 2021 -0600

    [FLINK-24635][examples] Fix deprecations in iterations example
---
 .../streaming/examples/iteration/IterateExample.java  | 16 +++++++++++++++-
 .../scala/examples/iteration/IterateExample.scala     | 19 +++++++++++++++++--
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index d8fed37..fa261cc 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -18,18 +18,24 @@
 package org.apache.flink.streaming.examples.iteration;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
+import java.time.Duration;
 import java.util.Random;
 
 /**
@@ -103,7 +109,15 @@ public class IterateExample {
 
         // emit results
         if (params.has("output")) {
-            numbers.writeAsText(params.get("output"));
+            numbers.sinkTo(
+                    FileSink.<Tuple2<Tuple2<Integer, Integer>, 
Integer>>forRowFormat(
+                                    new Path(params.get("output")), new 
SimpleStringEncoder<>())
+                            .withRollingPolicy(
+                                    DefaultRollingPolicy.builder()
+                                            
.withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                            
.withRolloverInterval(Duration.ofSeconds(10))
+                                            .build())
+                            .build());
         } else {
             System.out.println("Printing result to stdout. Use --output to 
specify output path.");
             numbers.print();
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
index 1fa3ace..4812ec4 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
+++ 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
@@ -18,14 +18,21 @@
 
 package org.apache.flink.streaming.scala.examples.iteration
 
-import java.util.Random
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 
+import java.util.Random
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.core.fs.Path
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
 
+import java.time.Duration
+
 /**
  * Example illustrating iterations in Flink streaming.
  *
@@ -95,7 +102,15 @@ object IterateExample {
       )
 
     if (params.has("output")) {
-      numbers.writeAsText(params.get("output"))
+      numbers.sinkTo(FileSink.forRowFormat[((Int, Int), Int)](
+          new Path(params.get("output")),
+          new SimpleStringEncoder())
+        .withRollingPolicy(DefaultRollingPolicy.builder()
+          .withMaxPartSize(MemorySize.ofMebiBytes(1))
+          .withRolloverInterval(Duration.ofSeconds(10))
+          .build())
+        .build())
+        .name("file-sink")
     } else {
       println("Printing result to stdout. Use --output to specify output 
path.")
       numbers.print()

Reply via email to