[ 
https://issues.apache.org/jira/browse/FLINK-22551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buom updated FLINK-22551:
-------------------------
    Description: 
* +*Case 1*:+ Work as expected

{code:java}
public class Example {

    public static class ExampleSource extends RichSourceFunction<String>
            implements CheckpointedFunction {

        private volatile boolean isRunning = true;

        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("[source] invoke open()");
        }

        @Override
        public void close() throws Exception {
            isRunning = false;
            System.out.println("[source] invoke close()");
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            System.out.println("[source] invoke run()");
            while (isRunning) {
                ctx.collect("Flink");
                Thread.sleep(500);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
            System.out.println("[source] invoke cancel()");
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
            System.out.println("[source] invoke snapshotState()");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
            System.out.println("[source] invoke initializeState()");
        }

    }

    public static class ExampleSink extends PrintSinkFunction<String>
            implements CheckpointedFunction {

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
            System.out.println("[sink] invoke snapshotState()");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
            System.out.println("[sink] invoke initializeState()");
        }
    }

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                
StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);

        DataStream<String> stream = env.addSource(new ExampleSource());
        stream.addSink(new ExampleSink()).setParallelism(1);

        env.execute();
    }
}
{code}
{code:java}
$ java -jar ./example.jar

[sink] invoke initializeState()
[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
Flink
[sink] invoke snapshotState()
[source] invoke snapshotState()
Flink
Flink
[sink] invoke snapshotState()
[source] invoke snapshotState()
Flink
Flink
[sink] invoke snapshotState()
[source] invoke snapshotState()
^C
{code}
 * *+Case 2:+* Run as unexpected (w/ _parallelism = 1_)

{code:java}
public class Example {

    public static class ExampleSource extends RichSourceFunction<String>
            implements CheckpointedFunction {

        private volatile boolean isRunning = true;

        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("[source] invoke open()");
        }

        @Override
        public void close() throws Exception {
            isRunning = false;
            System.out.println("[source] invoke close()");
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            System.out.println("[source] invoke run()");
            while (isRunning) {
                ctx.collect("Flink");
                Thread.sleep(500);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
            System.out.println("[source] invoke cancel()");
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
            System.out.println("[source] invoke snapshotState()");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
            System.out.println("[source] invoke initializeState()");
        }
    }


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                
StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);

        DataStream<String> stream = env.addSource(new ExampleSource());

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        String topic = "my-topic";

        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
                topic,
                (element, timestamp) -> {
                    byte[] value = element.getBytes(StandardCharsets.UTF_8);
                    return new ProducerRecord<>(topic, null, timestamp, null, 
value, null);
                },
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

        stream.addSink(kafkaProducer).setParallelism(1);

        env.execute();
    }

}
{code}
{code:java}
 $ java -jar ./example.jar

[source] invoke cancel() 
[source] invoke cancel() 
[source] invoke cancel() 
[source] invoke cancel() 
^C%
{code}
+*Case 3*+: Run as unexpected (wo/ _parallelism_)
{code:java}
public class Example {

    public static class ExampleSource extends RichSourceFunction<String>
            implements CheckpointedFunction {

        private volatile boolean isRunning = true;

        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("[source] invoke open()");
        }

        @Override
        public void close() throws Exception {
            isRunning = false;
            System.out.println("[source] invoke close()");
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            System.out.println("[source] invoke run()");
            while (isRunning) {
                ctx.collect("Flink");
                Thread.sleep(500);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
            System.out.println("[source] invoke cancel()");
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
            System.out.println("[source] invoke snapshotState()");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
            System.out.println("[source] invoke initializeState()");
        }

    }

    public static class ExampleSink extends PrintSinkFunction<String>
            implements CheckpointedFunction {

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
            System.out.println("[sink] invoke snapshotState()");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
            System.out.println("[sink] invoke initializeState()");
        }
    }

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                
StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);

        DataStream<String> stream = env.addSource(new ExampleSource());

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        String topic = "my-topic";

        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
                topic,
                (element, timestamp) -> {
                    byte[] value = element.getBytes(StandardCharsets.UTF_8);
                    return new ProducerRecord<>(topic, null, timestamp, null, 
value, null);
                },
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

        stream.addSink(kafkaProducer);
        env.execute();
    }

}{code}
{code:java}
$ java -jar ./example.jar

[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
[source] invoke cancel()
[source] invoke close()
[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
[source] invoke cancel()
[source] invoke close()
[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
[source] invoke snapshotState()
[source] invoke cancel()
[source] invoke close()
^C%
{code}

  was:
* +*Case 1*:+ Work as expected

{code:java}
public class Example {

    public static class ExampleSource extends RichSourceFunction<String>
            implements CheckpointedFunction {

        private volatile boolean isRunning = true;

        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("[source] invoke open()");
        }

        @Override
        public void close() throws Exception {
            isRunning = false;
            System.out.println("[source] invoke close()");
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            System.out.println("[source] invoke run()");
            while (isRunning) {
                ctx.collect("Flink");
                Thread.sleep(500);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
            System.out.println("[source] invoke cancel()");
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
            System.out.println("[source] invoke snapshotState()");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
            System.out.println("[source] invoke initializeState()");
        }

    }

    public static class ExampleSink extends PrintSinkFunction<String>
            implements CheckpointedFunction {

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
            System.out.println("[sink] invoke snapshotState()");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
            System.out.println("[sink] invoke initializeState()");
        }
    }

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                
StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);

        DataStream<String> stream = env.addSource(new ExampleSource());
        stream.addSink(new ExampleSink()).setParallelism(1);

        env.execute();
    }
}
{code}
{code:java}
$ java -jar ./example.jar

[sink] invoke initializeState()
[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
Flink
[sink] invoke snapshotState()
[source] invoke snapshotState()
Flink
Flink
[sink] invoke snapshotState()
[source] invoke snapshotState()
Flink
Flink
[sink] invoke snapshotState()
[source] invoke snapshotState()
^C
{code}
 * *+Case 2:+* Run as unexpected (w/ _parallelism = 1_)

{code:java}
public class Example {

    public static class ExampleSource extends RichSourceFunction<String>
            implements CheckpointedFunction {

        private volatile boolean isRunning = true;

        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("[source] invoke open()");
        }

        @Override
        public void close() throws Exception {
            isRunning = false;
            System.out.println("[source] invoke close()");
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            System.out.println("[source] invoke run()");
            while (isRunning) {
                ctx.collect("Flink");
                Thread.sleep(500);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
            System.out.println("[source] invoke cancel()");
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
            System.out.println("[source] invoke snapshotState()");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
            System.out.println("[source] invoke initializeState()");
        }
    }


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                
StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);

        DataStream<String> stream = env.addSource(new ExampleSource());

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        String topic = "my-topic";

        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
                topic,
                (element, timestamp) -> {
                    byte[] value = element.getBytes(StandardCharsets.UTF_8);
                    return new ProducerRecord<>(topic, null, timestamp, null, 
value, null);
                },
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

        stream.addSink(kafkaProducer).setParallelism(1);

        env.execute();
    }

}
{code}
{code:java}
 $ java -jar ./example.jar

[source] invoke cancel() 
[source] invoke cancel() 
[source] invoke cancel() 
[source] invoke cancel() 
^C%
{code}
+*Case 3*+: Run as unexpected (w/ _parallelism = defaul_t)
{code:java}
public class Example {

    public static class ExampleSource extends RichSourceFunction<String>
            implements CheckpointedFunction {

        private volatile boolean isRunning = true;

        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("[source] invoke open()");
        }

        @Override
        public void close() throws Exception {
            isRunning = false;
            System.out.println("[source] invoke close()");
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            System.out.println("[source] invoke run()");
            while (isRunning) {
                ctx.collect("Flink");
                Thread.sleep(500);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
            System.out.println("[source] invoke cancel()");
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
            System.out.println("[source] invoke snapshotState()");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
            System.out.println("[source] invoke initializeState()");
        }

    }

    public static class ExampleSink extends PrintSinkFunction<String>
            implements CheckpointedFunction {

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
            System.out.println("[sink] invoke snapshotState()");
        }

        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
            System.out.println("[sink] invoke initializeState()");
        }
    }

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                
StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);

        DataStream<String> stream = env.addSource(new ExampleSource());

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        String topic = "my-topic";

        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
                topic,
                (element, timestamp) -> {
                    byte[] value = element.getBytes(StandardCharsets.UTF_8);
                    return new ProducerRecord<>(topic, null, timestamp, null, 
value, null);
                },
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

        stream.addSink(kafkaProducer);
        env.execute();
    }

}{code}
{code:java}
$ java -jar ./example.jar

[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
[source] invoke cancel()
[source] invoke close()
[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
[source] invoke cancel()
[source] invoke close()
[source] invoke initializeState()
[source] invoke open()
[source] invoke run()
[source] invoke snapshotState()
[source] invoke cancel()
[source] invoke close()
^C%
{code}



> checkpoints: strange behaviour 
> -------------------------------
>
>                 Key: FLINK-22551
>                 URL: https://issues.apache.org/jira/browse/FLINK-22551
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.13.0
>         Environment: {code:java}
>  java -version
> openjdk version "11.0.2" 2019-01-15
> OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
> OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)
> {code}
>            Reporter: buom
>            Priority: Critical
>
> * +*Case 1*:+ Work as expected
> {code:java}
> public class Example {
>     public static class ExampleSource extends RichSourceFunction<String>
>             implements CheckpointedFunction {
>         private volatile boolean isRunning = true;
>         @Override
>         public void open(Configuration parameters) throws Exception {
>             System.out.println("[source] invoke open()");
>         }
>         @Override
>         public void close() throws Exception {
>             isRunning = false;
>             System.out.println("[source] invoke close()");
>         }
>         @Override
>         public void run(SourceContext<String> ctx) throws Exception {
>             System.out.println("[source] invoke run()");
>             while (isRunning) {
>                 ctx.collect("Flink");
>                 Thread.sleep(500);
>             }
>         }
>         @Override
>         public void cancel() {
>             isRunning = false;
>             System.out.println("[source] invoke cancel()");
>         }
>         @Override
>         public void snapshotState(FunctionSnapshotContext context) throws 
> Exception {
>             System.out.println("[source] invoke snapshotState()");
>         }
>         @Override
>         public void initializeState(FunctionInitializationContext context) 
> throws Exception {
>             System.out.println("[source] invoke initializeState()");
>         }
>     }
>     public static class ExampleSink extends PrintSinkFunction<String>
>             implements CheckpointedFunction {
>         @Override
>         public void snapshotState(FunctionSnapshotContext context) throws 
> Exception {
>             System.out.println("[sink] invoke snapshotState()");
>         }
>         @Override
>         public void initializeState(FunctionInitializationContext context) 
> throws Exception {
>             System.out.println("[sink] invoke initializeState()");
>         }
>     }
>     public static void main(String[] args) throws Exception {
>         final StreamExecutionEnvironment env =
>                 
> StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);
>         DataStream<String> stream = env.addSource(new ExampleSource());
>         stream.addSink(new ExampleSink()).setParallelism(1);
>         env.execute();
>     }
> }
> {code}
> {code:java}
> $ java -jar ./example.jar
> [sink] invoke initializeState()
> [source] invoke initializeState()
> [source] invoke open()
> [source] invoke run()
> Flink
> [sink] invoke snapshotState()
> [source] invoke snapshotState()
> Flink
> Flink
> [sink] invoke snapshotState()
> [source] invoke snapshotState()
> Flink
> Flink
> [sink] invoke snapshotState()
> [source] invoke snapshotState()
> ^C
> {code}
>  * *+Case 2:+* Run as unexpected (w/ _parallelism = 1_)
> {code:java}
> public class Example {
>     public static class ExampleSource extends RichSourceFunction<String>
>             implements CheckpointedFunction {
>         private volatile boolean isRunning = true;
>         @Override
>         public void open(Configuration parameters) throws Exception {
>             System.out.println("[source] invoke open()");
>         }
>         @Override
>         public void close() throws Exception {
>             isRunning = false;
>             System.out.println("[source] invoke close()");
>         }
>         @Override
>         public void run(SourceContext<String> ctx) throws Exception {
>             System.out.println("[source] invoke run()");
>             while (isRunning) {
>                 ctx.collect("Flink");
>                 Thread.sleep(500);
>             }
>         }
>         @Override
>         public void cancel() {
>             isRunning = false;
>             System.out.println("[source] invoke cancel()");
>         }
>         @Override
>         public void snapshotState(FunctionSnapshotContext context) throws 
> Exception {
>             System.out.println("[source] invoke snapshotState()");
>         }
>         @Override
>         public void initializeState(FunctionInitializationContext context) 
> throws Exception {
>             System.out.println("[source] invoke initializeState()");
>         }
>     }
>     public static void main(String[] args) throws Exception {
>         final StreamExecutionEnvironment env =
>                 
> StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);
>         DataStream<String> stream = env.addSource(new ExampleSource());
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>         String topic = "my-topic";
>         FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
>                 topic,
>                 (element, timestamp) -> {
>                     byte[] value = element.getBytes(StandardCharsets.UTF_8);
>                     return new ProducerRecord<>(topic, null, timestamp, null, 
> value, null);
>                 },
>                 properties,
>                 FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>         stream.addSink(kafkaProducer).setParallelism(1);
>         env.execute();
>     }
> }
> {code}
> {code:java}
>  $ java -jar ./example.jar
> [source] invoke cancel() 
> [source] invoke cancel() 
> [source] invoke cancel() 
> [source] invoke cancel() 
> ^C%
> {code}
> +*Case 3*+: Run as unexpected (wo/ _parallelism_)
> {code:java}
> public class Example {
>     public static class ExampleSource extends RichSourceFunction<String>
>             implements CheckpointedFunction {
>         private volatile boolean isRunning = true;
>         @Override
>         public void open(Configuration parameters) throws Exception {
>             System.out.println("[source] invoke open()");
>         }
>         @Override
>         public void close() throws Exception {
>             isRunning = false;
>             System.out.println("[source] invoke close()");
>         }
>         @Override
>         public void run(SourceContext<String> ctx) throws Exception {
>             System.out.println("[source] invoke run()");
>             while (isRunning) {
>                 ctx.collect("Flink");
>                 Thread.sleep(500);
>             }
>         }
>         @Override
>         public void cancel() {
>             isRunning = false;
>             System.out.println("[source] invoke cancel()");
>         }
>         @Override
>         public void snapshotState(FunctionSnapshotContext context) throws 
> Exception {
>             System.out.println("[source] invoke snapshotState()");
>         }
>         @Override
>         public void initializeState(FunctionInitializationContext context) 
> throws Exception {
>             System.out.println("[source] invoke initializeState()");
>         }
>     }
>     public static class ExampleSink extends PrintSinkFunction<String>
>             implements CheckpointedFunction {
>         @Override
>         public void snapshotState(FunctionSnapshotContext context) throws 
> Exception {
>             System.out.println("[sink] invoke snapshotState()");
>         }
>         @Override
>         public void initializeState(FunctionInitializationContext context) 
> throws Exception {
>             System.out.println("[sink] invoke initializeState()");
>         }
>     }
>     public static void main(String[] args) throws Exception {
>         final StreamExecutionEnvironment env =
>                 
> StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);
>         DataStream<String> stream = env.addSource(new ExampleSource());
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>         String topic = "my-topic";
>         FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
>                 topic,
>                 (element, timestamp) -> {
>                     byte[] value = element.getBytes(StandardCharsets.UTF_8);
>                     return new ProducerRecord<>(topic, null, timestamp, null, 
> value, null);
>                 },
>                 properties,
>                 FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>         stream.addSink(kafkaProducer);
>         env.execute();
>     }
> }{code}
> {code:java}
> $ java -jar ./example.jar
> [source] invoke initializeState()
> [source] invoke open()
> [source] invoke run()
> [source] invoke cancel()
> [source] invoke close()
> [source] invoke initializeState()
> [source] invoke open()
> [source] invoke run()
> [source] invoke cancel()
> [source] invoke close()
> [source] invoke initializeState()
> [source] invoke open()
> [source] invoke run()
> [source] invoke snapshotState()
> [source] invoke cancel()
> [source] invoke close()
> ^C%
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to