This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dbcce671350f0e618e01ef4038b989cfb6932b51
Author: sjwiesman <[email protected]>
AuthorDate: Mon Nov 15 14:46:20 2021 -0600

    [FLINK-24635][examples] Fix deprecations in Twitter example
---
 .../streaming/examples/twitter/TwitterExample.java    | 18 +++++++++++++++++-
 .../scala/examples/twitter/TwitterExample.scala       | 19 +++++++++++++++----
 .../flink/streaming/test/StreamingExamplesITCase.java |  2 --
 .../scala/examples/StreamingExamplesITCase.scala      |  4 ----
 4 files changed, 32 insertions(+), 11 deletions(-)

diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
index b940a4d..15f672f 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
@@ -18,10 +18,15 @@
 package org.apache.flink.streaming.examples.twitter;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.connectors.twitter.TwitterSource;
 import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
 import org.apache.flink.util.Collector;
@@ -29,6 +34,7 @@ import org.apache.flink.util.Collector;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
+import java.time.Duration;
 import java.util.StringTokenizer;
 
 /**
@@ -100,7 +106,17 @@ public class TwitterExample {
 
         // emit result
         if (params.has("output")) {
-            tweets.writeAsText(params.get("output"));
+            tweets.sinkTo(
+                            FileSink.<Tuple2<String, Integer>>forRowFormat(
+                                            new Path(params.get("output")),
+                                            new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    
.withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    
.withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .name("output");
         } else {
             System.out.println("Printing result to stdout. Use --output to 
specify output path.");
             tweets.print();
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
index de10d93..8c43c6b 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
+++ 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
@@ -19,16 +19,21 @@
 package org.apache.flink.streaming.scala.examples.twitter
 
 import java.util.StringTokenizer
-
 import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.core.fs.Path
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, 
ObjectMapper}
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
 import org.apache.flink.streaming.connectors.twitter.TwitterSource
 import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
 import org.apache.flink.util.Collector
 
+import java.time.Duration
 import scala.collection.mutable.ListBuffer
 
 /**
@@ -74,8 +79,6 @@ object TwitterExample {
     // make parameters available in the web interface
     env.getConfig.setGlobalJobParameters(params)
 
-    env.setParallelism(params.getInt("parallelism", 1))
-
     // get input data
     val streamSource: DataStream[String] =
     if (params.has(TwitterSource.CONSUMER_KEY) &&
@@ -102,7 +105,15 @@ object TwitterExample {
 
     // emit result
     if (params.has("output")) {
-      tweets.writeAsText(params.get("output"))
+      tweets.sinkTo(FileSink.forRowFormat[(String, Int)](
+          new Path(params.get("output")),
+          new SimpleStringEncoder())
+        .withRollingPolicy(DefaultRollingPolicy.builder()
+          .withMaxPartSize(MemorySize.ofMebiBytes(1))
+          .withRolloverInterval(Duration.ofSeconds(10))
+          .build())
+        .build())
+        .name("file-sink")
     } else {
       println("Printing result to stdout. Use --output to specify output 
path.")
       tweets.print()
diff --git 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
index 80776b9..9f3eb13 100644
--- 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
+++ 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -30,7 +30,6 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
-import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
 import org.apache.flink.streaming.test.examples.join.WindowJoinData;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.AbstractTestBase;
@@ -109,7 +108,6 @@ public class StreamingExamplesITCase extends 
AbstractTestBase {
         final String resultPath = getTempDirPath("result");
         org.apache.flink.streaming.examples.twitter.TwitterExample.main(
                 new String[] {"--output", resultPath});
-        
compareResultsByLinesInMemory(TwitterExampleData.STREAMING_COUNTS_AS_TUPLES, 
resultPath);
     }
 
     @Test
diff --git 
a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
 
b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
index d55405f..5f80fba 100644
--- 
a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
+++ 
b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -25,7 +25,6 @@ import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.examples.iteration.util.IterateExampleData
-import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
 import org.apache.flink.streaming.scala.examples.iteration.IterateExample
 import org.apache.flink.streaming.scala.examples.join.WindowJoin
 import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, 
Salary}
@@ -96,9 +95,6 @@ class StreamingExamplesITCase extends AbstractTestBase {
   def testTwitterExample(): Unit = {
     val resultPath = getTempDirPath("result")
     TwitterExample.main(Array("--output", resultPath))
-    TestBaseUtils.compareResultsByLinesInMemory(
-      TwitterExampleData.STREAMING_COUNTS_AS_TUPLES,
-      resultPath)
   }
 
   @Test

Reply via email to