[
https://issues.apache.org/jira/browse/FLINK-22551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-22551:
-----------------------------------
Labels: stale-critical (was: )
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Critical but is unassigned and neither itself nor its Sub-Tasks have been
updated for 7 days. I have gone ahead and marked it "stale-critical". If this
ticket is critical, please either assign yourself or give an update.
Afterwards, please remove the label or in 7 days the issue will be
deprioritized.
> checkpoints: strange behaviour
> -------------------------------
>
> Key: FLINK-22551
> URL: https://issues.apache.org/jira/browse/FLINK-22551
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.13.0
> Environment: {code:java}
> java -version
> openjdk version "11.0.2" 2019-01-15
> OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
> OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)
> {code}
> Reporter: buom
> Priority: Critical
> Labels: stale-critical
>
> * +*Case 1*:+ Work as expected
> {code:java}
> public class Example {
> public static class ExampleSource extends RichSourceFunction<String>
> implements CheckpointedFunction {
> private volatile boolean isRunning = true;
> @Override
> public void open(Configuration parameters) throws Exception {
> System.out.println("[source] invoke open()");
> }
> @Override
> public void close() throws Exception {
> isRunning = false;
> System.out.println("[source] invoke close()");
> }
> @Override
> public void run(SourceContext<String> ctx) throws Exception {
> System.out.println("[source] invoke run()");
> while (isRunning) {
> ctx.collect("Flink");
> Thread.sleep(500);
> }
> }
> @Override
> public void cancel() {
> isRunning = false;
> System.out.println("[source] invoke cancel()");
> }
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
> System.out.println("[source] invoke snapshotState()");
> }
> @Override
> public void initializeState(FunctionInitializationContext context)
> throws Exception {
> System.out.println("[source] invoke initializeState()");
> }
> }
> public static class ExampleSink extends PrintSinkFunction<String>
> implements CheckpointedFunction {
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
> System.out.println("[sink] invoke snapshotState()");
> }
> @Override
> public void initializeState(FunctionInitializationContext context)
> throws Exception {
> System.out.println("[sink] invoke initializeState()");
> }
> }
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env =
>
> StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);
> DataStream<String> stream = env.addSource(new ExampleSource());
> stream.addSink(new ExampleSink()).setParallelism(1);
> env.execute();
> }
> }
> {code}
> {code:java}
> $ java -jar ./example.jar
> [sink] invoke initializeState()
> [source] invoke initializeState()
> [source] invoke open()
> [source] invoke run()
> Flink
> [sink] invoke snapshotState()
> [source] invoke snapshotState()
> Flink
> Flink
> [sink] invoke snapshotState()
> [source] invoke snapshotState()
> Flink
> Flink
> [sink] invoke snapshotState()
> [source] invoke snapshotState()
> ^C
> {code}
> * *+Case 2:+* Run as unexpected (w/ _parallelism = 1_)
> {code:java}
> public class Example {
> public static class ExampleSource extends RichSourceFunction<String>
> implements CheckpointedFunction {
> private volatile boolean isRunning = true;
> @Override
> public void open(Configuration parameters) throws Exception {
> System.out.println("[source] invoke open()");
> }
> @Override
> public void close() throws Exception {
> isRunning = false;
> System.out.println("[source] invoke close()");
> }
> @Override
> public void run(SourceContext<String> ctx) throws Exception {
> System.out.println("[source] invoke run()");
> while (isRunning) {
> ctx.collect("Flink");
> Thread.sleep(500);
> }
> }
> @Override
> public void cancel() {
> isRunning = false;
> System.out.println("[source] invoke cancel()");
> }
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
> System.out.println("[source] invoke snapshotState()");
> }
> @Override
> public void initializeState(FunctionInitializationContext context)
> throws Exception {
> System.out.println("[source] invoke initializeState()");
> }
> }
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env =
>
> StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);
> DataStream<String> stream = env.addSource(new ExampleSource());
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> String topic = "my-topic";
> FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
> topic,
> (element, timestamp) -> {
> byte[] value = element.getBytes(StandardCharsets.UTF_8);
> return new ProducerRecord<>(topic, null, timestamp, null,
> value, null);
> },
> properties,
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
> stream.addSink(kafkaProducer).setParallelism(1);
> env.execute();
> }
> }
> {code}
> {code:java}
> $ java -jar ./example.jar
> [source] invoke cancel()
> [source] invoke cancel()
> [source] invoke cancel()
> [source] invoke cancel()
> ^C%
> {code}
> +*Case 3*+: Run as unexpected (wo/ _parallelism_)
> {code:java}
> public class Example {
> public static class ExampleSource extends RichSourceFunction<String>
> implements CheckpointedFunction {
> private volatile boolean isRunning = true;
> @Override
> public void open(Configuration parameters) throws Exception {
> System.out.println("[source] invoke open()");
> }
> @Override
> public void close() throws Exception {
> isRunning = false;
> System.out.println("[source] invoke close()");
> }
> @Override
> public void run(SourceContext<String> ctx) throws Exception {
> System.out.println("[source] invoke run()");
> while (isRunning) {
> ctx.collect("Flink");
> Thread.sleep(500);
> }
> }
> @Override
> public void cancel() {
> isRunning = false;
> System.out.println("[source] invoke cancel()");
> }
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
> System.out.println("[source] invoke snapshotState()");
> }
> @Override
> public void initializeState(FunctionInitializationContext context)
> throws Exception {
> System.out.println("[source] invoke initializeState()");
> }
> }
> public static class ExampleSink extends PrintSinkFunction<String>
> implements CheckpointedFunction {
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
> System.out.println("[sink] invoke snapshotState()");
> }
> @Override
> public void initializeState(FunctionInitializationContext context)
> throws Exception {
> System.out.println("[sink] invoke initializeState()");
> }
> }
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env =
>
> StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);
> DataStream<String> stream = env.addSource(new ExampleSource());
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> String topic = "my-topic";
> FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
> topic,
> (element, timestamp) -> {
> byte[] value = element.getBytes(StandardCharsets.UTF_8);
> return new ProducerRecord<>(topic, null, timestamp, null,
> value, null);
> },
> properties,
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
> stream.addSink(kafkaProducer);
> env.execute();
> }
> }{code}
> {code:java}
> $ java -jar ./example.jar
> [source] invoke initializeState()
> [source] invoke open()
> [source] invoke run()
> [source] invoke cancel()
> [source] invoke close()
> [source] invoke initializeState()
> [source] invoke open()
> [source] invoke run()
> [source] invoke cancel()
> [source] invoke close()
> [source] invoke initializeState()
> [source] invoke open()
> [source] invoke run()
> [source] invoke snapshotState()
> [source] invoke cancel()
> [source] invoke close()
> ^C%
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)