This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit dbcce671350f0e618e01ef4038b989cfb6932b51 Author: sjwiesman <[email protected]> AuthorDate: Mon Nov 15 14:46:20 2021 -0600 [FLINK-24635][examples] Fix deprecations in Twitter example --- .../streaming/examples/twitter/TwitterExample.java | 18 +++++++++++++++++- .../scala/examples/twitter/TwitterExample.scala | 19 +++++++++++++++---- .../flink/streaming/test/StreamingExamplesITCase.java | 2 -- .../scala/examples/StreamingExamplesITCase.scala | 4 ---- 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java index b940a4d..15f672f 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java @@ -18,10 +18,15 @@ package org.apache.flink.streaming.examples.twitter; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.connectors.twitter.TwitterSource; import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData; import org.apache.flink.util.Collector; @@ -29,6 +34,7 @@ import org.apache.flink.util.Collector; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import java.time.Duration; import java.util.StringTokenizer; /** @@ -100,7 +106,17 @@ public class TwitterExample { // emit result if (params.has("output")) { - tweets.writeAsText(params.get("output")); + tweets.sinkTo( + FileSink.<Tuple2<String, Integer>>forRowFormat( + new Path(params.get("output")), + new SimpleStringEncoder<>()) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withMaxPartSize(MemorySize.ofMebiBytes(1)) + .withRolloverInterval(Duration.ofSeconds(10)) + .build()) + .build()) + .name("output"); } else { System.out.println("Printing result to stdout. Use --output to specify output path."); tweets.print(); diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala index de10d93..8c43c6b 100644 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala @@ -19,16 +19,21 @@ package org.apache.flink.streaming.scala.examples.twitter import java.util.StringTokenizer - import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ +import org.apache.flink.configuration.MemorySize +import org.apache.flink.connector.file.sink.FileSink +import org.apache.flink.core.fs.Path import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.twitter.TwitterSource import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData import org.apache.flink.util.Collector +import java.time.Duration import scala.collection.mutable.ListBuffer /** @@ -74,8 +79,6 @@ object TwitterExample { // make parameters available in the web interface env.getConfig.setGlobalJobParameters(params) - env.setParallelism(params.getInt("parallelism", 1)) - // get input data val streamSource: DataStream[String] = if (params.has(TwitterSource.CONSUMER_KEY) && @@ -102,7 +105,15 @@ object TwitterExample { // emit result if (params.has("output")) { - tweets.writeAsText(params.get("output")) + tweets.sinkTo(FileSink.forRowFormat[(String, Int)]( + new Path(params.get("output")), + new SimpleStringEncoder()) + .withRollingPolicy(DefaultRollingPolicy.builder() + .withMaxPartSize(MemorySize.ofMebiBytes(1)) + .withRolloverInterval(Duration.ofSeconds(10)) + .build()) + .build()) + .name("file-sink") } else { println("Printing result to stdout. Use --output to specify output path.") tweets.print() diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java index 80776b9..9f3eb13 100644 --- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java @@ -30,7 +30,6 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.examples.iteration.util.IterateExampleData; -import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData; import org.apache.flink.streaming.test.examples.join.WindowJoinData; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.util.AbstractTestBase; @@ -109,7 +108,6 @@ public class StreamingExamplesITCase extends AbstractTestBase { final String resultPath = getTempDirPath("result"); org.apache.flink.streaming.examples.twitter.TwitterExample.main( new String[] {"--output", resultPath}); - compareResultsByLinesInMemory(TwitterExampleData.STREAMING_COUNTS_AS_TUPLES, resultPath); } @Test diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala index d55405f..5f80fba 100644 --- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala +++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala @@ -25,7 +25,6 @@ import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.examples.iteration.util.IterateExampleData -import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData import org.apache.flink.streaming.scala.examples.iteration.IterateExample import org.apache.flink.streaming.scala.examples.join.WindowJoin import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary} @@ -96,9 +95,6 @@ class StreamingExamplesITCase extends AbstractTestBase { def testTwitterExample(): Unit = { val resultPath = getTempDirPath("result") TwitterExample.main(Array("--output", resultPath)) - TestBaseUtils.compareResultsByLinesInMemory( - TwitterExampleData.STREAMING_COUNTS_AS_TUPLES, - resultPath) } @Test
