This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cfada4144c89eb407b127ba6d03086bc3f1066b0
Author: sjwiesman <[email protected]>
AuthorDate: Mon Nov 15 14:46:58 2021 -0600

    [FLINK-24635][examples] Fix deprecations in state machine example
---
 .../examples/statemachine/StateMachineExample.java   | 20 ++++++++++++++++++--
 1 file changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
index 72a1587..9763862 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
@@ -20,18 +20,22 @@ package org.apache.flink.streaming.examples.statemachine;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.examples.statemachine.dfa.State;
 import org.apache.flink.streaming.examples.statemachine.event.Alert;
 import org.apache.flink.streaming.examples.statemachine.event.Event;
@@ -39,6 +43,8 @@ import 
org.apache.flink.streaming.examples.statemachine.generator.EventsGenerato
 import 
org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializationSchema;
 import org.apache.flink.util.Collector;
 
+import java.time.Duration;
+
 /**
  * Main class of the state machine example. This class implements the 
streaming application that
  * receives the stream of events and evaluates a state machine (per 
originating address) to validate
@@ -140,7 +146,17 @@ public class StateMachineExample {
         if (outputFile == null) {
             alerts.print();
         } else {
-            alerts.writeAsText(outputFile, 
FileSystem.WriteMode.OVERWRITE).setParallelism(1);
+            alerts.sinkTo(
+                            FileSink.<Alert>forRowFormat(
+                                            new Path(outputFile), new 
SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    
.withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    
.withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .setParallelism(1)
+                    .name("output");
         }
 
         // trigger program execution

Reply via email to