[ 
https://issues.apache.org/jira/browse/FLINK-9640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou updated FLINK-9640:
----------------------------
    Description: 
steps to reproduce:
1. build a standalone flink cluster.
2. submit a test job like this below:
{code:java}
public class DemoJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.disableOperatorChaining();
        env.setParallelism(4);
        env.enableCheckpointing(3000);
        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        DataStream<String> inputStream = env.addSource(new 
StringGeneratorParallelSourceFunction());

        inputStream.map(String::hashCode).print();

        env.execute();
    }

    public static class StringGeneratorParallelSourceFunction extends 
RichParallelSourceFunction<String> {
        private static final Logger LOG = 
LoggerFactory.getLogger(StringGeneratorParallelSourceFunction.class);
        private volatile boolean running = true;
        private int index;
        private int subtask_nums;

        @Override
        public void open(Configuration parameters) throws Exception {
            index = getRuntimeContext().getIndexOfThisSubtask();
            subtask_nums = getRuntimeContext().getNumberOfParallelSubtasks();
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {

            while (running) {
                String data = UUID.randomUUID().toString();
                ctx.collect(data);
                LOG.info("subtask_index = {}, emit string = {}", index, data);
                Thread.sleep(50);
                if (index == subtask_nums / 2) {
                    running = false;
                    LOG.info("subtask_index = {}, finished.", index);
                }
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}
{code}

3. observer jm and tm logs can be found.
*taskmanager.log*
{code:java}
2018-06-21 17:05:54,144 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 2, emit string = 5b0c2467-ad2e-4b53-b1a4-7a0f64560570
2018-06-21 17:05:54,151 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 0, emit string = 11af78b3-59ea-467c-a267-7c2238e44ffe
2018-06-21 17:05:54,195 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 2, finished.
2018-06-21 17:05:54,200 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Source: Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) 
switched from RUNNING to FINISHED.
2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Freeing task resources for Source: Custom Source (3/4) 
(6b2a374bec5f31112811613537dd4fd9).
2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Ensuring all FileSystem streams are closed for task Source: 
Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) [FINISHED]
2018-06-21 17:05:54,202 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Un-registering task and sending final execution state FINISHED to 
JobManager for task Source: Custom Source (6b2a374bec5f31112811613537dd4fd9)
2018-06-21 17:05:54,211 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 0, emit string = f29f48fd-ca53-4a96-b596-93948c09581d
{code}

*jobmanager.log*
{code:java}
2018-06-21 17:05:52,682 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Unnamed 
(2/4) (3aee8bf5103065e8b57ebd0e214141ae) switched from DEPLOYING to RUNNING.
2018-06-21 17:05:52,683 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (2/4) 
(de90106d04f63cb9ea531308202e233f) switched from DEPLOYING to RUNNING.
2018-06-21 17:05:54,219 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom 
Source (3/4) (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING to 
FINISHED.
2018-06-21 17:05:54,224 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (3/4) 
(8f523afb97dc848a9578f9cae5870421) switched from RUNNING to FINISHED.
2018-06-21 17:05:54,224 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Unnamed 
(3/4) (39f76a87d20cfc491e11f0b5b08ec5c2) switched from RUNNING to FINISHED.
2018-06-21 17:05:55,069 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
triggering task Source: Custom Source (3/4) is not being executed at the 
moment. Aborting checkpoint.
2018-06-21 17:05:58,067 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
triggering task Source: Custom Source (3/4) is not being executed at the 
moment. Aborting checkpoint.
2018-06-21 17:06:01,067 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
triggering task Source: Custom Source (3/4) is not being executed at the 
moment. Aborting checkpoint.
2018-06-21 17:06:04,067 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
triggering task Source: Custom Source (3/4) is not being executed at the 
moment. Aborting checkpoint.
{code}

-------------------------------------------------------------------------------------------------

I think we should filter out the  {{FINISHED}} tasks, for tasks that need 
trigger checkpoint or need ack checkpoint. more details, see  
`org.apache.flink.runtime.checkpoint.CheckpointCoordinator` class.

  was:
steps to reproduce:
1. build a standalone flink cluster.
2. submit a test job like this below:
{code:java}
public class DemoJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.disableOperatorChaining();
        env.setParallelism(4);
        env.enableCheckpointing(3000);
        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        DataStream<String> inputStream = env.addSource(new 
StringGeneratorParallelSourceFunction());

        inputStream.map(String::hashCode).print();

        env.execute();
    }

    public static class StringGeneratorParallelSourceFunction extends 
RichParallelSourceFunction<String> {
        private static final Logger LOG = 
LoggerFactory.getLogger(StringGeneratorParallelSourceFunction.class);
        private volatile boolean running = true;
        private int index;
        private int subtask_nums;

        @Override
        public void open(Configuration parameters) throws Exception {
            index = getRuntimeContext().getIndexOfThisSubtask();
            subtask_nums = getRuntimeContext().getNumberOfParallelSubtasks();
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {

            while (running) {
                String data = UUID.randomUUID().toString();
                ctx.collect(data);
                LOG.info("subtask_index = {}, emit string = {}", index, data);
                Thread.sleep(50);
                if (index == subtask_nums / 2) {
                    running = false;
                    LOG.info("subtask_index = {}, finished.", index);
                }
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}
{code}

3. observer jm and tm logs can be found.
*taskmanager.log*
{code:java}
2018-06-21 17:05:54,144 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 2, emit string = 5b0c2467-ad2e-4b53-b1a4-7a0f64560570
2018-06-21 17:05:54,151 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 0, emit string = 11af78b3-59ea-467c-a267-7c2238e44ffe
2018-06-21 17:05:54,195 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 2, finished.
2018-06-21 17:05:54,200 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Source: Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) 
switched from RUNNING to FINISHED.
2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Freeing task resources for Source: Custom Source (3/4) 
(6b2a374bec5f31112811613537dd4fd9).
2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Ensuring all FileSystem streams are closed for task Source: 
Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) [FINISHED]
2018-06-21 17:05:54,202 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Un-registering task and sending final execution state FINISHED to 
JobManager for task Source: Custom Source (6b2a374bec5f31112811613537dd4fd9)
2018-06-21 17:05:54,211 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 0, emit string = f29f48fd-ca53-4a96-b596-93948c09581d
{code}

*jobmanager.log*
{code:java}
2018-06-21 17:05:52,682 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Unnamed 
(2/4) (3aee8bf5103065e8b57ebd0e214141ae) switched from DEPLOYING to RUNNING.
2018-06-21 17:05:52,683 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (2/4) 
(de90106d04f63cb9ea531308202e233f) switched from DEPLOYING to RUNNING.
2018-06-21 17:05:54,219 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom 
Source (3/4) (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING to 
FINISHED.
2018-06-21 17:05:54,224 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (3/4) 
(8f523afb97dc848a9578f9cae5870421) switched from RUNNING to FINISHED.
2018-06-21 17:05:54,224 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Unnamed 
(3/4) (39f76a87d20cfc491e11f0b5b08ec5c2) switched from RUNNING to FINISHED.
2018-06-21 17:05:55,069 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
triggering task Source: Custom Source (3/4) is not being executed at the 
moment. Aborting checkpoint.
2018-06-21 17:05:58,067 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
triggering task Source: Custom Source (3/4) is not being executed at the 
moment. Aborting checkpoint.
2018-06-21 17:06:01,067 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
triggering task Source: Custom Source (3/4) is not being executed at the 
moment. Aborting checkpoint.
2018-06-21 17:06:04,067 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
triggering task Source: Custom Source (3/4) is not being executed at the 
moment. Aborting checkpoint.
{code}

-------------------------------------------------------------------------------------------------

I think we should filter out the  {{FINISHED}} tasks, for tasks that need 
trigger or need ack. more details, see  
`org.apache.flink.runtime.checkpoint.CheckpointCoordinator` class.


> Checkpointing is aways aborted if any task has been finished
> ------------------------------------------------------------
>
>                 Key: FLINK-9640
>                 URL: https://issues.apache.org/jira/browse/FLINK-9640
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.0, 1.4.0, 1.5.0, 1.6.0
>            Reporter: Hai Zhou
>            Assignee: Hai Zhou
>            Priority: Major
>             Fix For: 1.6.0
>
>
> steps to reproduce:
> 1. build a standalone flink cluster.
> 2. submit a test job like this below:
> {code:java}
> public class DemoJob {
>     public static void main(String[] args) throws Exception {
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.disableOperatorChaining();
>         env.setParallelism(4);
>         env.enableCheckpointing(3000);
>         
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>         DataStream<String> inputStream = env.addSource(new 
> StringGeneratorParallelSourceFunction());
>         inputStream.map(String::hashCode).print();
>         env.execute();
>     }
>     public static class StringGeneratorParallelSourceFunction extends 
> RichParallelSourceFunction<String> {
>         private static final Logger LOG = 
> LoggerFactory.getLogger(StringGeneratorParallelSourceFunction.class);
>         private volatile boolean running = true;
>         private int index;
>         private int subtask_nums;
>         @Override
>         public void open(Configuration parameters) throws Exception {
>             index = getRuntimeContext().getIndexOfThisSubtask();
>             subtask_nums = getRuntimeContext().getNumberOfParallelSubtasks();
>         }
>         @Override
>         public void run(SourceContext<String> ctx) throws Exception {
>             while (running) {
>                 String data = UUID.randomUUID().toString();
>                 ctx.collect(data);
>                 LOG.info("subtask_index = {}, emit string = {}", index, data);
>                 Thread.sleep(50);
>                 if (index == subtask_nums / 2) {
>                     running = false;
>                     LOG.info("subtask_index = {}, finished.", index);
>                 }
>             }
>         }
>         @Override
>         public void cancel() {
>             running = false;
>         }
>     }
> }
> {code}
> 3. observer jm and tm logs can be found.
> *taskmanager.log*
> {code:java}
> 2018-06-21 17:05:54,144 INFO  
> com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
> subtask_index = 2, emit string = 5b0c2467-ad2e-4b53-b1a4-7a0f64560570
> 2018-06-21 17:05:54,151 INFO  
> com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
> subtask_index = 0, emit string = 11af78b3-59ea-467c-a267-7c2238e44ffe
> 2018-06-21 17:05:54,195 INFO  
> com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
> subtask_index = 2, finished.
> 2018-06-21 17:05:54,200 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Source: Custom Source (3/4) 
> (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING to FINISHED.
> 2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Freeing task resources for Source: Custom Source (3/4) 
> (6b2a374bec5f31112811613537dd4fd9).
> 2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Ensuring all FileSystem streams are closed for task Source: 
> Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) [FINISHED]
> 2018-06-21 17:05:54,202 INFO  org.apache.flink.yarn.YarnTaskManager           
>               - Un-registering task and sending final execution state 
> FINISHED to JobManager for task Source: Custom Source 
> (6b2a374bec5f31112811613537dd4fd9)
> 2018-06-21 17:05:54,211 INFO  
> com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
> subtask_index = 0, emit string = f29f48fd-ca53-4a96-b596-93948c09581d
> {code}
> *jobmanager.log*
> {code:java}
> 2018-06-21 17:05:52,682 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Unnamed 
> (2/4) (3aee8bf5103065e8b57ebd0e214141ae) switched from DEPLOYING to RUNNING.
> 2018-06-21 17:05:52,683 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (2/4) 
> (de90106d04f63cb9ea531308202e233f) switched from DEPLOYING to RUNNING.
> 2018-06-21 17:05:54,219 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
> Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING 
> to FINISHED.
> 2018-06-21 17:05:54,224 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (3/4) 
> (8f523afb97dc848a9578f9cae5870421) switched from RUNNING to FINISHED.
> 2018-06-21 17:05:54,224 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Unnamed 
> (3/4) (39f76a87d20cfc491e11f0b5b08ec5c2) switched from RUNNING to FINISHED.
> 2018-06-21 17:05:55,069 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
> triggering task Source: Custom Source (3/4) is not being executed at the 
> moment. Aborting checkpoint.
> 2018-06-21 17:05:58,067 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
> triggering task Source: Custom Source (3/4) is not being executed at the 
> moment. Aborting checkpoint.
> 2018-06-21 17:06:01,067 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
> triggering task Source: Custom Source (3/4) is not being executed at the 
> moment. Aborting checkpoint.
> 2018-06-21 17:06:04,067 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
> triggering task Source: Custom Source (3/4) is not being executed at the 
> moment. Aborting checkpoint.
> {code}
> -------------------------------------------------------------------------------------------------
> I think we should filter out the  {{FINISHED}} tasks, for tasks that need 
> trigger checkpoint or need ack checkpoint. more details, see  
> `org.apache.flink.runtime.checkpoint.CheckpointCoordinator` class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to