This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0e3273d198ed0f69ffda44dd15daf0290a9a81c9 Author: sjwiesman <[email protected]> AuthorDate: Mon Nov 15 14:52:59 2021 -0600 [FLINK-24635][examples] Fix deprecations in iterations example --- .../streaming/examples/iteration/IterateExample.java | 16 +++++++++++++++- .../scala/examples/iteration/IterateExample.scala | 19 +++++++++++++++++-- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index d8fed37..fa261cc 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -18,18 +18,24 @@ package org.apache.flink.streaming.examples.iteration; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.IterativeStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; +import java.time.Duration; import java.util.Random; /** @@ -103,7 +109,15 @@ public class IterateExample { // emit results if (params.has("output")) { - numbers.writeAsText(params.get("output")); + numbers.sinkTo( + FileSink.<Tuple2<Tuple2<Integer, Integer>, Integer>>forRowFormat( + new Path(params.get("output")), new SimpleStringEncoder<>()) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withMaxPartSize(MemorySize.ofMebiBytes(1)) + .withRolloverInterval(Duration.ofSeconds(10)) + .build()) + .build()); } else { System.out.println("Printing result to stdout. Use --output to specify output path."); numbers.print(); diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala index 1fa3ace..4812ec4 100644 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala @@ -18,14 +18,21 @@ package org.apache.flink.streaming.scala.examples.iteration -import java.util.Random +import org.apache.flink.api.common.serialization.SimpleStringEncoder +import java.util.Random import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ +import org.apache.flink.configuration.MemorySize +import org.apache.flink.connector.file.sink.FileSink +import org.apache.flink.core.fs.Path +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} +import java.time.Duration + /** * Example illustrating iterations in Flink streaming. * @@ -95,7 +102,15 @@ object IterateExample { ) if (params.has("output")) { - numbers.writeAsText(params.get("output")) + numbers.sinkTo(FileSink.forRowFormat[((Int, Int), Int)]( + new Path(params.get("output")), + new SimpleStringEncoder()) + .withRollingPolicy(DefaultRollingPolicy.builder() + .withMaxPartSize(MemorySize.ofMebiBytes(1)) + .withRolloverInterval(Duration.ofSeconds(10)) + .build()) + .build()) + .name("file-sink") } else { println("Printing result to stdout. Use --output to specify output path.") numbers.print()
