This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cfada4144c89eb407b127ba6d03086bc3f1066b0 Author: sjwiesman <[email protected]> AuthorDate: Mon Nov 15 14:46:58 2021 -0600 [FLINK-24635][examples] Fix deprecations in state machine example --- .../examples/statemachine/StateMachineExample.java | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java index 72a1587..9763862 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java @@ -20,18 +20,22 @@ package org.apache.flink.streaming.examples.statemachine; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; -import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.examples.statemachine.dfa.State; import org.apache.flink.streaming.examples.statemachine.event.Alert; import org.apache.flink.streaming.examples.statemachine.event.Event; @@ -39,6 +43,8 @@ import org.apache.flink.streaming.examples.statemachine.generator.EventsGenerato import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializationSchema; import org.apache.flink.util.Collector; +import java.time.Duration; + /** * Main class of the state machine example. This class implements the streaming application that * receives the stream of events and evaluates a state machine (per originating address) to validate @@ -140,7 +146,17 @@ public class StateMachineExample { if (outputFile == null) { alerts.print(); } else { - alerts.writeAsText(outputFile, FileSystem.WriteMode.OVERWRITE).setParallelism(1); + alerts.sinkTo( + FileSink.<Alert>forRowFormat( + new Path(outputFile), new SimpleStringEncoder<>()) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withMaxPartSize(MemorySize.ofMebiBytes(1)) + .withRolloverInterval(Duration.ofSeconds(10)) + .build()) + .build()) + .setParallelism(1) + .name("output"); } // trigger program execution
