lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r936161829


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##########
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint. This class is an extension to {@link 
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * <h2>Stream operator recipient</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on 
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link 
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test 
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream 
operators can correctly
+ * handle received operator events and inform coordinators of completing 
checkpoint.
+ *
+ * <h2>Non-source stream task</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are 
executed independently
+ * of each other. They do not have the upstream-downstream relationship as 
operators usually do in a
+ * streaming job, and thus both of them are treated as source tasks. This test 
class further
+ * verifies situations when the tested operators are not sources, which means 
when checkpoint
+ * barriers are injected into sources, these operators may not have started 
checkpoint yet.
+ *
+ * <h2>Unaligned checkpoint</h2>
+ *
+ * <p>This class tests both aligned and unaligned checkpoints to verify that 
the correctness of the
+ * event delivery behavior around checkpoint is not affected by this condition.
+ *
+ * <h2>Non-global failure</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, failures occur at the 
coordinators' side, so
+ * they will cause the whole Flink job to fail over. In this class, test cases 
are added when there
+ * might only be fail-overs on the subtasks' side, while the coordinators are 
not affected. In this
+ * case the production infrastructure code needs to work together with user 
code (implementations
+ * inside the coordinator and operator subclass) to ensure the exactly-once 
semantics of operator
+ * event delivery.
+ *
+ * <p>In the test cases of this class, the tested coordinator would inject 
failure during its
+ * sending operator events. Besides, it is additionally guaranteed that there 
must have been a
+ * checkpoint completed before the failure is injected, and that there must be 
events sent from the
+ * coordinator to its subtask during checkpoint.
+ */
+public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
+        extends CoordinatorEventsExactlyOnceITCase {
+
+    private static final int NUM_EVENTS = 100;
+
+    private static final int DELAY = 1;
+
+    /** Whether the coordinator has sent any event to its subtask after any 
checkpoint. */
+    private static boolean sentEventAfterCkpt;
+
+    /** Whether the {@link IdlingSourceFunction} should be closed to finish 
the job. */
+    private static boolean shouldCloseSource;
+
+    private StreamExecutionEnvironment env;
+
+    @Before
+    public void setup() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        shouldCloseSource = false;
+        sentEventAfterCkpt = false;
+    }
+
+    @Test
+    public void testCoordinatorSendEvents() throws Exception {

Review Comment:
   Can we make the test name consistent with each other?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##########
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint. This class is an extension to {@link 
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * <h2>Stream operator recipient</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on 
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link 
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test 
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream 
operators can correctly
+ * handle received operator events and inform coordinators of completing 
checkpoint.
+ *
+ * <h2>Non-source stream task</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are 
executed independently
+ * of each other. They do not have the upstream-downstream relationship as 
operators usually do in a
+ * streaming job, and thus both of them are treated as source tasks. This test 
class further
+ * verifies situations when the tested operators are not sources, which means 
when checkpoint
+ * barriers are injected into sources, these operators may not have started 
checkpoint yet.
+ *
+ * <h2>Unaligned checkpoint</h2>
+ *
+ * <p>This class tests both aligned and unaligned checkpoints to verify that 
the correctness of the
+ * event delivery behavior around checkpoint is not affected by this condition.
+ *
+ * <h2>Non-global failure</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, failures occur at the 
coordinators' side, so
+ * they will cause the whole Flink job to fail over. In this class, test cases 
are added when there
+ * might only be fail-overs on the subtasks' side, while the coordinators are 
not affected. In this
+ * case the production infrastructure code needs to work together with user 
code (implementations
+ * inside the coordinator and operator subclass) to ensure the exactly-once 
semantics of operator
+ * event delivery.
+ *
+ * <p>In the test cases of this class, the tested coordinator would inject 
failure during its
+ * sending operator events. Besides, it is additionally guaranteed that there 
must have been a
+ * checkpoint completed before the failure is injected, and that there must be 
events sent from the
+ * coordinator to its subtask during checkpoint.
+ */
+public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
+        extends CoordinatorEventsExactlyOnceITCase {
+
+    private static final int NUM_EVENTS = 100;
+
+    private static final int DELAY = 1;
+
+    /** Whether the coordinator has sent any event to its subtask after any 
checkpoint. */
+    private static boolean sentEventAfterCkpt;
+
+    /** Whether the {@link IdlingSourceFunction} should be closed to finish 
the job. */
+    private static boolean shouldCloseSource;
+
+    private StreamExecutionEnvironment env;
+
+    @Before
+    public void setup() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        shouldCloseSource = false;
+        sentEventAfterCkpt = false;
+    }
+
+    @Test
+    public void testCoordinatorSendEvents() throws Exception {
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
NUM_EVENTS, DELAY));
+    }
+
+    @Test
+    public void testUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
NUM_EVENTS, DELAY));
+    }
+
+    @Test
+    public void testFailingCheckpoint() throws Exception {
+        executeAndVerifyResults(
+                env,
+                new FailingCheckpointOperatorFactory<>("failingCheckpoint", 
NUM_EVENTS, DELAY));
+        
assertThat(TestScript.getForOperator("failingCheckpoint-subtask0").hasAlreadyFailed())
+                .isTrue();
+    }
+
+    @Test
+    public void testFailingUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResults(
+                env,
+                new FailingCheckpointOperatorFactory<>("failingCheckpoint", 
NUM_EVENTS, DELAY));
+        
assertThat(TestScript.getForOperator("failingCheckpoint-subtask0").hasAlreadyFailed())
+                .isTrue();
+    }
+
+    private void executeAndVerifyResults(
+            StreamExecutionEnvironment env, 
EventReceivingOperatorFactory<Long, Long> factory)
+            throws Exception {
+        // The event receiving operator is not chained together with the 
source operator, so that
+        // when checkpoint barriers are injected into sources, the event 
receiving operator has not
+        // started checkpoint yet.
+        env.addSource(new IdlingSourceFunction<>(), 
TypeInformation.of(Long.class))
+                .transform(
+                        "blockCheckpointBarrier",
+                        TypeInformation.of(Long.class),
+                        new BlockCheckpointBarrierOperator<>())
+                .disableChaining()
+                .transform(factory.name, TypeInformation.of(Long.class), 
factory)
+                .addSink(new DiscardingSink<>());
+
+        JobExecutionResult executionResult =
+                MINI_CLUSTER
+                        .getMiniCluster()
+                        
.executeJobBlocking(env.getStreamGraph().getJobGraph());
+
+        List<Integer> receivedInts =
+                
executionResult.getAccumulatorResult(EventReceivingOperator.ACCUMULATOR_NAME);
+        checkListContainsSequence(receivedInts, NUM_EVENTS);
+    }
+
+    /** A mock source function that does not collect any stream record and 
finishes on demand. */
+    private static class IdlingSourceFunction<T> extends RichSourceFunction<T>
+            implements ParallelSourceFunction<T> {
+        @Override
+        public void run(SourceContext<T> ctx) throws Exception {
+            while (!shouldCloseSource) {
+                Thread.sleep(100);
+            }
+        }
+
+        @Override
+        public void cancel() {}
+    }
+
+    /**
+     * A stream operator that blocks the checkpoint barrier until the 
coordinator has sent events to
+     * its subtask. It helps to guarantee that there are events being sent 
when the coordinator has
+     * completed the checkpoint while the subtask has not yet.
+     */
+    private static class BlockCheckpointBarrierOperator<T> extends 
AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T> {
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws 
Exception {
+            super.snapshotState(context);
+            while (!sentEventAfterCkpt) {
+                Thread.sleep(100);
+            }
+        }
+    }
+
+    /**
+     * A wrapper operator factory for {@link EventReceivingOperator} and {@link
+     * EventSendingCoordinator}.
+     */
+    private static class EventReceivingOperatorFactory<IN, OUT>
+            extends AbstractStreamOperatorFactory<OUT>
+            implements CoordinatedOperatorFactory<OUT>, 
OneInputStreamOperatorFactory<IN, OUT> {
+
+        protected final String name;
+
+        protected final int numEvents;
+
+        private final int delay;
+
+        public EventReceivingOperatorFactory(String name, int numEvents, int 
delay) {
+            this.name = name;
+            this.numEvents = numEvents;
+            this.delay = delay;
+        }
+
+        @Override
+        public OperatorCoordinator.Provider getCoordinatorProvider(
+                String operatorName, OperatorID operatorID) {
+            return new OperatorCoordinator.Provider() {
+
+                @Override
+                public OperatorID getOperatorId() {
+                    return operatorID;
+                }
+
+                @Override
+                public OperatorCoordinator create(OperatorCoordinator.Context 
context) {
+                    return new EventSendingCoordinator(context, name, 
numEvents, delay);
+                }
+            };
+        }
+
+        @Override
+        public <T extends StreamOperator<OUT>> T createStreamOperator(
+                StreamOperatorParameters<OUT> parameters) {
+            EventReceivingOperator<OUT> operator = new 
EventReceivingOperator<>();
+            operator.setup(
+                    parameters.getContainingTask(),
+                    parameters.getStreamConfig(),
+                    parameters.getOutput());
+            parameters
+                    .getOperatorEventDispatcher()
+                    
.registerEventHandler(parameters.getStreamConfig().getOperatorID(), operator);
+            return (T) operator;
+        }
+
+        @Override
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return EventReceivingOperator.class;
+        }
+    }
+
+    /**
+     * A subclass of {@link 
CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator} that has the
+     * following additional behavior:
+     *
+     * <ul>
+     *   <li>The job must have completed a checkpoint before the coordinator 
injects the failure.
+     *   <li>The failure would be injected after the coordinator has completed 
its first checkpoint
+     *       and before it completes the next.
+     *   <li>There must be events being sent when the coordinator has 
completed the first checkpoint
+     *       while the subtask has not.
+     * </ul>
+     */
+    private static class EventSendingCoordinator
+            extends CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator 
{
+        private static final long NO_CHECKPOINT = -1;
+
+        /** The coordinator should have completed checkpoints before sending 
out this message. */
+        private final int checkpointBeforeMessage;
+
+        /**
+         * The id of a completed checkpoint for the job.
+         *
+         * <ul>
+         *   <li>If not set(equal to {@link #NO_CHECKPOINT}), it will be set 
to the next completed
+         *       checkpoint.
+         *   <li>If the coordinator is restored from the checkpoint, it will 
be set to the restored
+         *       checkpoint.
+         *   <li>Otherwise, its value will not be changed.
+         * </ul>
+         *
+         * <p>It is used to assist the setting of {@link 
#hasCompletedCheckpointForCoordinator} and
+         * {@link #hasCompletedCheckpointForJob}.
+         */
+        private long markedCompletedCheckpointId;

Review Comment:
   How about `lastCompletedCheckpointId`?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java:
##########
@@ -350,10 +345,10 @@ public void start() throws Exception {}
         @Override
         public void close() throws Exception {
             scheduledExecutor.shutdownNow();
-            assertTrue(scheduledExecutor.awaitTermination(10, 
TimeUnit.MINUTES));
+            assertThat(scheduledExecutor.awaitTermination(10, 
TimeUnit.MINUTES)).isTrue();
 
             mailboxExecutor.shutdownNow();
-            assertTrue(mailboxExecutor.awaitTermination(10, TimeUnit.MINUTES));
+            assertThat(scheduledExecutor.awaitTermination(10, 
TimeUnit.MINUTES)).isTrue();

Review Comment:
   Do you mean to use `mailboxExecutor` here?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##########
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint. This class is an extension to {@link 
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * <h2>Stream operator recipient</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on 
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link 
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test 
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream 
operators can correctly
+ * handle received operator events and inform coordinators of completing 
checkpoint.
+ *
+ * <h2>Non-source stream task</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are 
executed independently
+ * of each other. They do not have the upstream-downstream relationship as 
operators usually do in a
+ * streaming job, and thus both of them are treated as source tasks. This test 
class further
+ * verifies situations when the tested operators are not sources, which means 
when checkpoint
+ * barriers are injected into sources, these operators may not have started 
checkpoint yet.
+ *
+ * <h2>Unaligned checkpoint</h2>
+ *
+ * <p>This class tests both aligned and unaligned checkpoints to verify that 
the correctness of the
+ * event delivery behavior around checkpoint is not affected by this condition.
+ *
+ * <h2>Non-global failure</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, failures occur at the 
coordinators' side, so
+ * they will cause the whole Flink job to fail over. In this class, test cases 
are added when there
+ * might only be fail-overs on the subtasks' side, while the coordinators are 
not affected. In this
+ * case the production infrastructure code needs to work together with user 
code (implementations
+ * inside the coordinator and operator subclass) to ensure the exactly-once 
semantics of operator
+ * event delivery.
+ *
+ * <p>In the test cases of this class, the tested coordinator would inject 
failure during its
+ * sending operator events. Besides, it is additionally guaranteed that there 
must have been a
+ * checkpoint completed before the failure is injected, and that there must be 
events sent from the
+ * coordinator to its subtask during checkpoint.
+ */
+public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
+        extends CoordinatorEventsExactlyOnceITCase {
+
+    private static final int NUM_EVENTS = 100;
+
+    private static final int DELAY = 1;
+
+    /** Whether the coordinator has sent any event to its subtask after any 
checkpoint. */
+    private static boolean sentEventAfterCkpt;
+
+    /** Whether the {@link IdlingSourceFunction} should be closed to finish 
the job. */
+    private static boolean shouldCloseSource;
+
+    private StreamExecutionEnvironment env;
+
+    @Before
+    public void setup() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        shouldCloseSource = false;
+        sentEventAfterCkpt = false;
+    }
+
+    @Test
+    public void testCoordinatorSendEvents() throws Exception {
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
NUM_EVENTS, DELAY));
+    }
+
+    @Test
+    public void testUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
NUM_EVENTS, DELAY));
+    }
+
+    @Test
+    public void testFailingCheckpoint() throws Exception {
+        executeAndVerifyResults(
+                env,
+                new FailingCheckpointOperatorFactory<>("failingCheckpoint", 
NUM_EVENTS, DELAY));
+        
assertThat(TestScript.getForOperator("failingCheckpoint-subtask0").hasAlreadyFailed())
+                .isTrue();
+    }
+
+    @Test
+    public void testFailingUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResults(
+                env,
+                new FailingCheckpointOperatorFactory<>("failingCheckpoint", 
NUM_EVENTS, DELAY));
+        
assertThat(TestScript.getForOperator("failingCheckpoint-subtask0").hasAlreadyFailed())
+                .isTrue();
+    }
+
+    private void executeAndVerifyResults(
+            StreamExecutionEnvironment env, 
EventReceivingOperatorFactory<Long, Long> factory)
+            throws Exception {
+        // The event receiving operator is not chained together with the 
source operator, so that
+        // when checkpoint barriers are injected into sources, the event 
receiving operator has not
+        // started checkpoint yet.
+        env.addSource(new IdlingSourceFunction<>(), 
TypeInformation.of(Long.class))
+                .transform(
+                        "blockCheckpointBarrier",
+                        TypeInformation.of(Long.class),
+                        new BlockCheckpointBarrierOperator<>())
+                .disableChaining()
+                .transform(factory.name, TypeInformation.of(Long.class), 
factory)
+                .addSink(new DiscardingSink<>());
+
+        JobExecutionResult executionResult =
+                MINI_CLUSTER
+                        .getMiniCluster()
+                        
.executeJobBlocking(env.getStreamGraph().getJobGraph());
+
+        List<Integer> receivedInts =
+                
executionResult.getAccumulatorResult(EventReceivingOperator.ACCUMULATOR_NAME);
+        checkListContainsSequence(receivedInts, NUM_EVENTS);
+    }
+
+    /** A mock source function that does not collect any stream record and 
finishes on demand. */
+    private static class IdlingSourceFunction<T> extends RichSourceFunction<T>
+            implements ParallelSourceFunction<T> {
+        @Override
+        public void run(SourceContext<T> ctx) throws Exception {
+            while (!shouldCloseSource) {
+                Thread.sleep(100);
+            }
+        }
+
+        @Override
+        public void cancel() {}
+    }
+
+    /**
+     * A stream operator that blocks the checkpoint barrier until the 
coordinator has sent events to
+     * its subtask. It helps to guarantee that there are events being sent 
when the coordinator has
+     * completed the checkpoint while the subtask has not yet.
+     */
+    private static class BlockCheckpointBarrierOperator<T> extends 
AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T> {
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws 
Exception {
+            super.snapshotState(context);
+            while (!sentEventAfterCkpt) {
+                Thread.sleep(100);
+            }
+        }
+    }
+
+    /**
+     * A wrapper operator factory for {@link EventReceivingOperator} and {@link
+     * EventSendingCoordinator}.
+     */
+    private static class EventReceivingOperatorFactory<IN, OUT>
+            extends AbstractStreamOperatorFactory<OUT>
+            implements CoordinatedOperatorFactory<OUT>, 
OneInputStreamOperatorFactory<IN, OUT> {
+
+        protected final String name;
+
+        protected final int numEvents;
+
+        private final int delay;
+
+        public EventReceivingOperatorFactory(String name, int numEvents, int 
delay) {
+            this.name = name;
+            this.numEvents = numEvents;
+            this.delay = delay;
+        }
+
+        @Override
+        public OperatorCoordinator.Provider getCoordinatorProvider(
+                String operatorName, OperatorID operatorID) {
+            return new OperatorCoordinator.Provider() {
+
+                @Override
+                public OperatorID getOperatorId() {
+                    return operatorID;
+                }
+
+                @Override
+                public OperatorCoordinator create(OperatorCoordinator.Context 
context) {
+                    return new EventSendingCoordinator(context, name, 
numEvents, delay);
+                }
+            };
+        }
+
+        @Override
+        public <T extends StreamOperator<OUT>> T createStreamOperator(
+                StreamOperatorParameters<OUT> parameters) {
+            EventReceivingOperator<OUT> operator = new 
EventReceivingOperator<>();
+            operator.setup(
+                    parameters.getContainingTask(),
+                    parameters.getStreamConfig(),
+                    parameters.getOutput());
+            parameters
+                    .getOperatorEventDispatcher()
+                    
.registerEventHandler(parameters.getStreamConfig().getOperatorID(), operator);
+            return (T) operator;
+        }
+
+        @Override
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return EventReceivingOperator.class;
+        }
+    }
+
+    /**
+     * A subclass of {@link 
CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator} that has the
+     * following additional behavior:
+     *
+     * <ul>
+     *   <li>The job must have completed a checkpoint before the coordinator 
injects the failure.
+     *   <li>The failure would be injected after the coordinator has completed 
its first checkpoint
+     *       and before it completes the next.
+     *   <li>There must be events being sent when the coordinator has 
completed the first checkpoint
+     *       while the subtask has not.
+     * </ul>
+     */
+    private static class EventSendingCoordinator
+            extends CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator 
{
+        private static final long NO_CHECKPOINT = -1;
+
+        /** The coordinator should have completed checkpoints before sending 
out this message. */
+        private final int checkpointBeforeMessage;
+
+        /**
+         * The id of a completed checkpoint for the job.
+         *
+         * <ul>
+         *   <li>If not set(equal to {@link #NO_CHECKPOINT}), it will be set 
to the next completed
+         *       checkpoint.
+         *   <li>If the coordinator is restored from the checkpoint, it will 
be set to the restored
+         *       checkpoint.
+         *   <li>Otherwise, its value will not be changed.
+         * </ul>
+         *
+         * <p>It is used to assist the setting of {@link 
#hasCompletedCheckpointForCoordinator} and
+         * {@link #hasCompletedCheckpointForJob}.
+         */
+        private long markedCompletedCheckpointId;
+
+        /** Whether the coordinator has completed any checkpoint. */
+        private boolean hasCompletedCheckpointForCoordinator;
+
+        /** Whether the whole job (both coordinator and operator) has 
completed any checkpoint. */
+        private boolean hasCompletedCheckpointForJob;
+
+        public EventSendingCoordinator(Context context, String name, int 
numEvents, int delay) {
+            super(context, name, numEvents, delay);
+            this.checkpointBeforeMessage = new Random().nextInt(numEvents / 4);
+            this.markedCompletedCheckpointId = NO_CHECKPOINT;
+            this.hasCompletedCheckpointForCoordinator = false;
+            this.hasCompletedCheckpointForJob = false;
+        }
+
+        @Override
+        protected void sendNextEvent() {
+            if (!hasCompletedCheckpointForCoordinator && nextNumber >= 
checkpointBeforeMessage) {
+                return;
+            }
+
+            if (!hasCompletedCheckpointForJob && nextNumber >= failAtMessage) {
+                return;
+            }
+
+            super.sendNextEvent();
+
+            if (!sentEventAfterCkpt && hasCompletedCheckpointForCoordinator) {
+                sentEventAfterCkpt = true;
+            }
+        }
+
+        @Override
+        protected void handleCheckpoint() {
+            if (nextToComplete != null && sentEventAfterCkpt && 
!testScript.hasAlreadyFailed()) {
+                testScript.recordHasFailed();
+                context.failJob(new Exception("test failure"));
+            }
+
+            if (nextToComplete != null) {
+                hasCompletedCheckpointForCoordinator = true;
+            }
+
+            super.handleCheckpoint();
+        }
+
+        @Override
+        public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
+                throws Exception {
+            super.resetToCheckpoint(checkpointId, checkpointData);
+            markedCompletedCheckpointId = checkpointId;
+            hasCompletedCheckpointForCoordinator = true;
+            hasCompletedCheckpointForJob = true;
+        }
+
+        @Override
+        public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> result)
+                throws Exception {
+            super.checkpointCoordinator(checkpointId, result);
+            if (markedCompletedCheckpointId == NO_CHECKPOINT) {
+                markedCompletedCheckpointId = checkpointId;

Review Comment:
   I suppose a checkpoint is only completed after `notifyCheckpointComplete()` 
is invoked. Is it correct to update it here?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##########
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint. This class is an extension to {@link 
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * <h2>Stream operator recipient</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on 
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link 
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test 
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream 
operators can correctly
+ * handle received operator events and inform coordinators of completing 
checkpoint.
+ *
+ * <h2>Non-source stream task</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are 
executed independently
+ * of each other. They do not have the upstream-downstream relationship as 
operators usually do in a
+ * streaming job, and thus both of them are treated as source tasks. This test 
class further
+ * verifies situations when the tested operators are not sources, which means 
when checkpoint
+ * barriers are injected into sources, these operators may not have started 
checkpoint yet.
+ *
+ * <h2>Unaligned checkpoint</h2>
+ *
+ * <p>This class tests both aligned and unaligned checkpoints to verify that 
the correctness of the
+ * event delivery behavior around checkpoint is not affected by this condition.
+ *
+ * <h2>Non-global failure</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, failures occur at the 
coordinators' side, so
+ * they will cause the whole Flink job to fail over. In this class, test cases 
are added when there
+ * might only be fail-overs on the subtasks' side, while the coordinators are 
not affected. In this
+ * case the production infrastructure code needs to work together with user 
code (implementations
+ * inside the coordinator and operator subclass) to ensure the exactly-once 
semantics of operator
+ * event delivery.
+ *
+ * <p>In the test cases of this class, the tested coordinator would inject 
failure during its
+ * sending operator events. Besides, it is additionally guaranteed that there 
must have been a
+ * checkpoint completed before the failure is injected, and that there must be 
events sent from the
+ * coordinator to its subtask during checkpoint.
+ */
+public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
+        extends CoordinatorEventsExactlyOnceITCase {
+
+    private static final int NUM_EVENTS = 100;
+
+    private static final int DELAY = 1;
+
+    /** Whether the coordinator has sent any event to its subtask after any 
checkpoint. */
+    private static boolean sentEventAfterCkpt;

Review Comment:
   How about `isEventSentAfterFirstCheckpoint`?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##########
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint. This class is an extension to {@link 
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * <h2>Stream operator recipient</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on 
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link 
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test 
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream 
operators can correctly
+ * handle received operator events and inform coordinators of completing 
checkpoint.
+ *
+ * <h2>Non-source stream task</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are 
executed independently
+ * of each other. They do not have the upstream-downstream relationship as 
operators usually do in a
+ * streaming job, and thus both of them are treated as source tasks. This test 
class further
+ * verifies situations when the tested operators are not sources, which means 
when checkpoint
+ * barriers are injected into sources, these operators may not have started 
checkpoint yet.
+ *
+ * <h2>Unaligned checkpoint</h2>
+ *
+ * <p>This class tests both aligned and unaligned checkpoints to verify that 
the correctness of the
+ * event delivery behavior around checkpoint is not affected by this condition.
+ *
+ * <h2>Non-global failure</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, failures occur at the 
coordinators' side, so
+ * they will cause the whole Flink job to fail over. In this class, test cases 
are added when there
+ * might only be fail-overs on the subtasks' side, while the coordinators are 
not affected. In this
+ * case the production infrastructure code needs to work together with user 
code (implementations
+ * inside the coordinator and operator subclass) to ensure the exactly-once 
semantics of operator
+ * event delivery.
+ *
+ * <p>In the test cases of this class, the tested coordinator would inject 
failure during its
+ * sending operator events. Besides, it is additionally guaranteed that there 
must have been a
+ * checkpoint completed before the failure is injected, and that there must be 
events sent from the
+ * coordinator to its subtask during checkpoint.
+ */
+public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
+        extends CoordinatorEventsExactlyOnceITCase {
+
+    private static final int NUM_EVENTS = 100;
+
+    private static final int DELAY = 1;
+
+    /** Whether the coordinator has sent any event to its subtask after any 
checkpoint. */
+    private static boolean sentEventAfterCkpt;
+
+    /** Whether the {@link IdlingSourceFunction} should be closed to finish 
the job. */
+    private static boolean shouldCloseSource;
+
+    private StreamExecutionEnvironment env;
+
+    @Before
+    public void setup() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        shouldCloseSource = false;
+        sentEventAfterCkpt = false;
+    }
+
+    @Test
+    public void testCoordinatorSendEvents() throws Exception {
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
NUM_EVENTS, DELAY));
+    }
+
+    @Test
+    public void testUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
NUM_EVENTS, DELAY));
+    }
+
+    @Test
+    public void testFailingCheckpoint() throws Exception {
+        executeAndVerifyResults(
+                env,
+                new FailingCheckpointOperatorFactory<>("failingCheckpoint", 
NUM_EVENTS, DELAY));
+        
assertThat(TestScript.getForOperator("failingCheckpoint-subtask0").hasAlreadyFailed())
+                .isTrue();
+    }
+
+    @Test
+    public void testFailingUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResults(
+                env,
+                new FailingCheckpointOperatorFactory<>("failingCheckpoint", 
NUM_EVENTS, DELAY));
+        
assertThat(TestScript.getForOperator("failingCheckpoint-subtask0").hasAlreadyFailed())
+                .isTrue();
+    }
+
+    private void executeAndVerifyResults(
+            StreamExecutionEnvironment env, 
EventReceivingOperatorFactory<Long, Long> factory)
+            throws Exception {
+        // The event receiving operator is not chained together with the 
source operator, so that
+        // when checkpoint barriers are injected into sources, the event 
receiving operator has not
+        // started checkpoint yet.
+        env.addSource(new IdlingSourceFunction<>(), 
TypeInformation.of(Long.class))
+                .transform(
+                        "blockCheckpointBarrier",
+                        TypeInformation.of(Long.class),
+                        new BlockCheckpointBarrierOperator<>())
+                .disableChaining()
+                .transform(factory.name, TypeInformation.of(Long.class), 
factory)
+                .addSink(new DiscardingSink<>());
+
+        JobExecutionResult executionResult =
+                MINI_CLUSTER
+                        .getMiniCluster()
+                        
.executeJobBlocking(env.getStreamGraph().getJobGraph());
+
+        List<Integer> receivedInts =
+                
executionResult.getAccumulatorResult(EventReceivingOperator.ACCUMULATOR_NAME);
+        checkListContainsSequence(receivedInts, NUM_EVENTS);
+    }
+
+    /** A mock source function that does not collect any stream record and 
finishes on demand. */
+    private static class IdlingSourceFunction<T> extends RichSourceFunction<T>
+            implements ParallelSourceFunction<T> {
+        @Override
+        public void run(SourceContext<T> ctx) throws Exception {
+            while (!shouldCloseSource) {
+                Thread.sleep(100);
+            }
+        }
+
+        @Override
+        public void cancel() {}
+    }
+
+    /**
+     * A stream operator that blocks the checkpoint barrier until the 
coordinator has sent events to
+     * its subtask. It helps to guarantee that there are events being sent 
when the coordinator has
+     * completed the checkpoint while the subtask has not yet.
+     */
+    private static class BlockCheckpointBarrierOperator<T> extends 
AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T> {
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws 
Exception {
+            super.snapshotState(context);
+            while (!sentEventAfterCkpt) {
+                Thread.sleep(100);
+            }
+        }
+    }
+
+    /**
+     * A wrapper operator factory for {@link EventReceivingOperator} and {@link
+     * EventSendingCoordinator}.
+     */
+    private static class EventReceivingOperatorFactory<IN, OUT>
+            extends AbstractStreamOperatorFactory<OUT>
+            implements CoordinatedOperatorFactory<OUT>, 
OneInputStreamOperatorFactory<IN, OUT> {
+
+        protected final String name;
+
+        protected final int numEvents;
+
+        private final int delay;
+
+        public EventReceivingOperatorFactory(String name, int numEvents, int 
delay) {
+            this.name = name;
+            this.numEvents = numEvents;
+            this.delay = delay;
+        }
+
+        @Override
+        public OperatorCoordinator.Provider getCoordinatorProvider(
+                String operatorName, OperatorID operatorID) {
+            return new OperatorCoordinator.Provider() {
+
+                @Override
+                public OperatorID getOperatorId() {
+                    return operatorID;
+                }
+
+                @Override
+                public OperatorCoordinator create(OperatorCoordinator.Context 
context) {
+                    return new EventSendingCoordinator(context, name, 
numEvents, delay);
+                }
+            };
+        }
+
+        @Override
+        public <T extends StreamOperator<OUT>> T createStreamOperator(
+                StreamOperatorParameters<OUT> parameters) {
+            EventReceivingOperator<OUT> operator = new 
EventReceivingOperator<>();
+            operator.setup(
+                    parameters.getContainingTask(),
+                    parameters.getStreamConfig(),
+                    parameters.getOutput());
+            parameters
+                    .getOperatorEventDispatcher()
+                    
.registerEventHandler(parameters.getStreamConfig().getOperatorID(), operator);
+            return (T) operator;
+        }
+
+        @Override
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return EventReceivingOperator.class;
+        }
+    }
+
+    /**
+     * A subclass of {@link 
CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator} that has the
+     * following additional behavior:
+     *
+     * <ul>
+     *   <li>The job must have completed a checkpoint before the coordinator 
injects the failure.
+     *   <li>The failure would be injected after the coordinator has completed 
its first checkpoint
+     *       and before it completes the next.
+     *   <li>There must be events being sent when the coordinator has 
completed the first checkpoint
+     *       while the subtask has not.
+     * </ul>
+     */
+    private static class EventSendingCoordinator
+            extends CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator 
{
+        private static final long NO_CHECKPOINT = -1;
+
+        /** The coordinator should have completed checkpoints before sending 
out this message. */
+        private final int checkpointBeforeMessage;

Review Comment:
   Would it be better to re-name it as `numMessagesBeforeFirstCheckpoint`?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##########
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint. This class is an extension to {@link 
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * <h2>Stream operator recipient</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on 
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link 
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test 
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream 
operators can correctly
+ * handle received operator events and inform coordinators of completing 
checkpoint.
+ *
+ * <h2>Non-source stream task</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are 
executed independently
+ * of each other. They do not have the upstream-downstream relationship as 
operators usually do in a
+ * streaming job, and thus both of them are treated as source tasks. This test 
class further
+ * verifies situations when the tested operators are not sources, which means 
when checkpoint
+ * barriers are injected into sources, these operators may not have started 
checkpoint yet.
+ *
+ * <h2>Unaligned checkpoint</h2>
+ *
+ * <p>This class tests both aligned and unaligned checkpoints to verify that 
the correctness of the
+ * event delivery behavior around checkpoint is not affected by this condition.
+ *
+ * <h2>Non-global failure</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, failures occur at the 
coordinators' side, so
+ * they will cause the whole Flink job to fail over. In this class, test cases 
are added when there
+ * might only be fail-overs on the subtasks' side, while the coordinators are 
not affected. In this
+ * case the production infrastructure code needs to work together with user 
code (implementations
+ * inside the coordinator and operator subclass) to ensure the exactly-once 
semantics of operator
+ * event delivery.
+ *
+ * <p>In the test cases of this class, the tested coordinator would inject 
failure during its
+ * sending operator events. Besides, it is additionally guaranteed that there 
must have been a
+ * checkpoint completed before the failure is injected, and that there must be 
events sent from the
+ * coordinator to its subtask during checkpoint.
+ */
+public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
+        extends CoordinatorEventsExactlyOnceITCase {
+
+    private static final int NUM_EVENTS = 100;
+
+    private static final int DELAY = 1;
+
+    /** Whether the coordinator has sent any event to its subtask after any 
checkpoint. */
+    private static boolean sentEventAfterCkpt;
+
+    /** Whether the {@link IdlingSourceFunction} should be closed to finish 
the job. */
+    private static boolean shouldCloseSource;
+
+    private StreamExecutionEnvironment env;
+
+    @Before
+    public void setup() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        shouldCloseSource = false;
+        sentEventAfterCkpt = false;
+    }
+
+    @Test
+    public void testCoordinatorSendEvents() throws Exception {
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
NUM_EVENTS, DELAY));
+    }
+
+    @Test
+    public void testUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
NUM_EVENTS, DELAY));
+    }
+
+    @Test
+    public void testFailingCheckpoint() throws Exception {
+        executeAndVerifyResults(
+                env,
+                new FailingCheckpointOperatorFactory<>("failingCheckpoint", 
NUM_EVENTS, DELAY));
+        
assertThat(TestScript.getForOperator("failingCheckpoint-subtask0").hasAlreadyFailed())
+                .isTrue();
+    }
+
+    @Test
+    public void testFailingUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResults(
+                env,
+                new FailingCheckpointOperatorFactory<>("failingCheckpoint", 
NUM_EVENTS, DELAY));
+        
assertThat(TestScript.getForOperator("failingCheckpoint-subtask0").hasAlreadyFailed())
+                .isTrue();
+    }
+
+    private void executeAndVerifyResults(
+            StreamExecutionEnvironment env, 
EventReceivingOperatorFactory<Long, Long> factory)
+            throws Exception {
+        // The event receiving operator is not chained together with the 
source operator, so that
+        // when checkpoint barriers are injected into sources, the event 
receiving operator has not
+        // started checkpoint yet.
+        env.addSource(new IdlingSourceFunction<>(), 
TypeInformation.of(Long.class))
+                .transform(
+                        "blockCheckpointBarrier",
+                        TypeInformation.of(Long.class),
+                        new BlockCheckpointBarrierOperator<>())
+                .disableChaining()
+                .transform(factory.name, TypeInformation.of(Long.class), 
factory)
+                .addSink(new DiscardingSink<>());
+
+        JobExecutionResult executionResult =
+                MINI_CLUSTER
+                        .getMiniCluster()
+                        
.executeJobBlocking(env.getStreamGraph().getJobGraph());
+
+        List<Integer> receivedInts =
+                
executionResult.getAccumulatorResult(EventReceivingOperator.ACCUMULATOR_NAME);
+        checkListContainsSequence(receivedInts, NUM_EVENTS);
+    }
+
+    /** A mock source function that does not collect any stream record and 
finishes on demand. */
+    private static class IdlingSourceFunction<T> extends RichSourceFunction<T>
+            implements ParallelSourceFunction<T> {
+        @Override
+        public void run(SourceContext<T> ctx) throws Exception {
+            while (!shouldCloseSource) {
+                Thread.sleep(100);
+            }
+        }
+
+        @Override
+        public void cancel() {}
+    }
+
+    /**
+     * A stream operator that blocks the checkpoint barrier until the 
coordinator has sent events to
+     * its subtask. It helps to guarantee that there are events being sent 
when the coordinator has
+     * completed the checkpoint while the subtask has not yet.
+     */
+    private static class BlockCheckpointBarrierOperator<T> extends 
AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T> {
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws 
Exception {
+            super.snapshotState(context);
+            while (!sentEventAfterCkpt) {
+                Thread.sleep(100);
+            }
+        }
+    }
+
+    /**
+     * A wrapper operator factory for {@link EventReceivingOperator} and {@link
+     * EventSendingCoordinator}.
+     */
+    private static class EventReceivingOperatorFactory<IN, OUT>
+            extends AbstractStreamOperatorFactory<OUT>
+            implements CoordinatedOperatorFactory<OUT>, 
OneInputStreamOperatorFactory<IN, OUT> {
+
+        protected final String name;
+
+        protected final int numEvents;
+
+        private final int delay;
+
+        public EventReceivingOperatorFactory(String name, int numEvents, int 
delay) {
+            this.name = name;
+            this.numEvents = numEvents;
+            this.delay = delay;
+        }
+
+        @Override
+        public OperatorCoordinator.Provider getCoordinatorProvider(
+                String operatorName, OperatorID operatorID) {
+            return new OperatorCoordinator.Provider() {
+
+                @Override
+                public OperatorID getOperatorId() {
+                    return operatorID;
+                }
+
+                @Override
+                public OperatorCoordinator create(OperatorCoordinator.Context 
context) {
+                    return new EventSendingCoordinator(context, name, 
numEvents, delay);
+                }
+            };
+        }
+
+        @Override
+        public <T extends StreamOperator<OUT>> T createStreamOperator(
+                StreamOperatorParameters<OUT> parameters) {
+            EventReceivingOperator<OUT> operator = new 
EventReceivingOperator<>();
+            operator.setup(
+                    parameters.getContainingTask(),
+                    parameters.getStreamConfig(),
+                    parameters.getOutput());
+            parameters
+                    .getOperatorEventDispatcher()
+                    
.registerEventHandler(parameters.getStreamConfig().getOperatorID(), operator);
+            return (T) operator;
+        }
+
+        @Override
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return EventReceivingOperator.class;
+        }
+    }
+
+    /**
+     * A subclass of {@link 
CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator} that has the
+     * following additional behavior:
+     *
+     * <ul>
+     *   <li>The job must have completed a checkpoint before the coordinator 
injects the failure.
+     *   <li>The failure would be injected after the coordinator has completed 
its first checkpoint
+     *       and before it completes the next.
+     *   <li>There must be events being sent when the coordinator has 
completed the first checkpoint
+     *       while the subtask has not.
+     * </ul>
+     */
+    private static class EventSendingCoordinator
+            extends CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator 
{
+        private static final long NO_CHECKPOINT = -1;
+
+        /** The coordinator should have completed checkpoints before sending 
out this message. */
+        private final int checkpointBeforeMessage;
+
+        /**
+         * The id of a completed checkpoint for the job.
+         *
+         * <ul>
+         *   <li>If not set(equal to {@link #NO_CHECKPOINT}), it will be set 
to the next completed
+         *       checkpoint.
+         *   <li>If the coordinator is restored from the checkpoint, it will 
be set to the restored
+         *       checkpoint.
+         *   <li>Otherwise, its value will not be changed.
+         * </ul>
+         *
+         * <p>It is used to assist the setting of {@link 
#hasCompletedCheckpointForCoordinator} and
+         * {@link #hasCompletedCheckpointForJob}.
+         */
+        private long markedCompletedCheckpointId;
+
+        /** Whether the coordinator has completed any checkpoint. */
+        private boolean hasCompletedCheckpointForCoordinator;
+
+        /** Whether the whole job (both coordinator and operator) has 
completed any checkpoint. */
+        private boolean hasCompletedCheckpointForJob;

Review Comment:
   Is it possible to remove this variable and use `markedCompletedCheckpointId 
== NO_CHECKPOINT` instead?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##########
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint. This class is an extension to {@link 
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * <h2>Stream operator recipient</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on 
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link 
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test 
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream 
operators can correctly
+ * handle received operator events and inform coordinators of completing 
checkpoint.
+ *
+ * <h2>Non-source stream task</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are 
executed independently
+ * of each other. They do not have the upstream-downstream relationship as 
operators usually do in a
+ * streaming job, and thus both of them are treated as source tasks. This test 
class further
+ * verifies situations when the tested operators are not sources, which means 
when checkpoint
+ * barriers are injected into sources, these operators may not have started 
checkpoint yet.
+ *
+ * <h2>Unaligned checkpoint</h2>
+ *
+ * <p>This class tests both aligned and unaligned checkpoints to verify that 
the correctness of the
+ * event delivery behavior around checkpoint is not affected by this condition.
+ *
+ * <h2>Non-global failure</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, failures occur at the 
coordinators' side, so
+ * they will cause the whole Flink job to fail over. In this class, test cases 
are added when there
+ * might only be fail-overs on the subtasks' side, while the coordinators are 
not affected. In this
+ * case the production infrastructure code needs to work together with user 
code (implementations
+ * inside the coordinator and operator subclass) to ensure the exactly-once 
semantics of operator
+ * event delivery.
+ *
+ * <p>In the test cases of this class, the tested coordinator would inject 
failure during its
+ * sending operator events. Besides, it is additionally guaranteed that there 
must have been a
+ * checkpoint completed before the failure is injected, and that there must be 
events sent from the
+ * coordinator to its subtask during checkpoint.
+ */
+public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
+        extends CoordinatorEventsExactlyOnceITCase {
+
+    private static final int NUM_EVENTS = 100;
+
+    private static final int DELAY = 1;
+
+    /** Whether the coordinator has sent any event to its subtask after any 
checkpoint. */
+    private static boolean sentEventAfterCkpt;
+
+    /** Whether the {@link IdlingSourceFunction} should be closed to finish 
the job. */
+    private static boolean shouldCloseSource;
+
+    private StreamExecutionEnvironment env;
+
+    @Before
+    public void setup() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        shouldCloseSource = false;
+        sentEventAfterCkpt = false;
+    }
+
+    @Test
+    public void testCoordinatorSendEvents() throws Exception {
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
NUM_EVENTS, DELAY));
+    }
+
+    @Test
+    public void testUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
NUM_EVENTS, DELAY));
+    }
+
+    @Test
+    public void testFailingCheckpoint() throws Exception {
+        executeAndVerifyResults(
+                env,
+                new FailingCheckpointOperatorFactory<>("failingCheckpoint", 
NUM_EVENTS, DELAY));
+        
assertThat(TestScript.getForOperator("failingCheckpoint-subtask0").hasAlreadyFailed())
+                .isTrue();
+    }
+
+    @Test
+    public void testFailingUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResults(
+                env,
+                new FailingCheckpointOperatorFactory<>("failingCheckpoint", 
NUM_EVENTS, DELAY));
+        
assertThat(TestScript.getForOperator("failingCheckpoint-subtask0").hasAlreadyFailed())
+                .isTrue();
+    }
+
+    private void executeAndVerifyResults(
+            StreamExecutionEnvironment env, 
EventReceivingOperatorFactory<Long, Long> factory)
+            throws Exception {
+        // The event receiving operator is not chained together with the 
source operator, so that
+        // when checkpoint barriers are injected into sources, the event 
receiving operator has not
+        // started checkpoint yet.
+        env.addSource(new IdlingSourceFunction<>(), 
TypeInformation.of(Long.class))
+                .transform(
+                        "blockCheckpointBarrier",
+                        TypeInformation.of(Long.class),
+                        new BlockCheckpointBarrierOperator<>())
+                .disableChaining()
+                .transform(factory.name, TypeInformation.of(Long.class), 
factory)
+                .addSink(new DiscardingSink<>());
+
+        JobExecutionResult executionResult =
+                MINI_CLUSTER
+                        .getMiniCluster()
+                        
.executeJobBlocking(env.getStreamGraph().getJobGraph());
+
+        List<Integer> receivedInts =
+                
executionResult.getAccumulatorResult(EventReceivingOperator.ACCUMULATOR_NAME);
+        checkListContainsSequence(receivedInts, NUM_EVENTS);
+    }
+
+    /** A mock source function that does not collect any stream record and 
finishes on demand. */
+    private static class IdlingSourceFunction<T> extends RichSourceFunction<T>
+            implements ParallelSourceFunction<T> {
+        @Override
+        public void run(SourceContext<T> ctx) throws Exception {
+            while (!shouldCloseSource) {
+                Thread.sleep(100);
+            }
+        }
+
+        @Override
+        public void cancel() {}
+    }
+
+    /**
+     * A stream operator that blocks the checkpoint barrier until the 
coordinator has sent events to
+     * its subtask. It helps to guarantee that there are events being sent 
when the coordinator has
+     * completed the checkpoint while the subtask has not yet.
+     */
+    private static class BlockCheckpointBarrierOperator<T> extends 
AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T> {
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws 
Exception {
+            super.snapshotState(context);
+            while (!sentEventAfterCkpt) {
+                Thread.sleep(100);
+            }
+        }
+    }
+
+    /**
+     * A wrapper operator factory for {@link EventReceivingOperator} and {@link
+     * EventSendingCoordinator}.
+     */
+    private static class EventReceivingOperatorFactory<IN, OUT>
+            extends AbstractStreamOperatorFactory<OUT>
+            implements CoordinatedOperatorFactory<OUT>, 
OneInputStreamOperatorFactory<IN, OUT> {
+
+        protected final String name;
+
+        protected final int numEvents;
+
+        private final int delay;
+
+        public EventReceivingOperatorFactory(String name, int numEvents, int 
delay) {
+            this.name = name;
+            this.numEvents = numEvents;
+            this.delay = delay;
+        }
+
+        @Override
+        public OperatorCoordinator.Provider getCoordinatorProvider(
+                String operatorName, OperatorID operatorID) {
+            return new OperatorCoordinator.Provider() {
+
+                @Override
+                public OperatorID getOperatorId() {
+                    return operatorID;
+                }
+
+                @Override
+                public OperatorCoordinator create(OperatorCoordinator.Context 
context) {
+                    return new EventSendingCoordinator(context, name, 
numEvents, delay);
+                }
+            };
+        }
+
+        @Override
+        public <T extends StreamOperator<OUT>> T createStreamOperator(
+                StreamOperatorParameters<OUT> parameters) {
+            EventReceivingOperator<OUT> operator = new 
EventReceivingOperator<>();
+            operator.setup(
+                    parameters.getContainingTask(),
+                    parameters.getStreamConfig(),
+                    parameters.getOutput());
+            parameters
+                    .getOperatorEventDispatcher()
+                    
.registerEventHandler(parameters.getStreamConfig().getOperatorID(), operator);
+            return (T) operator;
+        }
+
+        @Override
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return EventReceivingOperator.class;
+        }
+    }
+
+    /**
+     * A subclass of {@link 
CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator} that has the
+     * following additional behavior:
+     *
+     * <ul>
+     *   <li>The job must have completed a checkpoint before the coordinator 
injects the failure.
+     *   <li>The failure would be injected after the coordinator has completed 
its first checkpoint
+     *       and before it completes the next.
+     *   <li>There must be events being sent when the coordinator has 
completed the first checkpoint
+     *       while the subtask has not.
+     * </ul>
+     */
+    private static class EventSendingCoordinator
+            extends CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator 
{
+        private static final long NO_CHECKPOINT = -1;
+
+        /** The coordinator should have completed checkpoints before sending 
out this message. */
+        private final int checkpointBeforeMessage;
+
+        /**
+         * The id of a completed checkpoint for the job.
+         *
+         * <ul>
+         *   <li>If not set(equal to {@link #NO_CHECKPOINT}), it will be set 
to the next completed
+         *       checkpoint.
+         *   <li>If the coordinator is restored from the checkpoint, it will 
be set to the restored
+         *       checkpoint.
+         *   <li>Otherwise, its value will not be changed.
+         * </ul>
+         *
+         * <p>It is used to assist the setting of {@link 
#hasCompletedCheckpointForCoordinator} and
+         * {@link #hasCompletedCheckpointForJob}.
+         */
+        private long markedCompletedCheckpointId;
+
+        /** Whether the coordinator has completed any checkpoint. */
+        private boolean hasCompletedCheckpointForCoordinator;
+
+        /** Whether the whole job (both coordinator and operator) has 
completed any checkpoint. */
+        private boolean hasCompletedCheckpointForJob;
+
+        public EventSendingCoordinator(Context context, String name, int 
numEvents, int delay) {
+            super(context, name, numEvents, delay);
+            this.checkpointBeforeMessage = new Random().nextInt(numEvents / 4);
+            this.markedCompletedCheckpointId = NO_CHECKPOINT;
+            this.hasCompletedCheckpointForCoordinator = false;
+            this.hasCompletedCheckpointForJob = false;
+        }
+
+        @Override
+        protected void sendNextEvent() {
+            if (!hasCompletedCheckpointForCoordinator && nextNumber >= 
checkpointBeforeMessage) {
+                return;
+            }
+
+            if (!hasCompletedCheckpointForJob && nextNumber >= failAtMessage) {
+                return;
+            }
+
+            super.sendNextEvent();
+
+            if (!sentEventAfterCkpt && hasCompletedCheckpointForCoordinator) {
+                sentEventAfterCkpt = true;
+            }
+        }
+
+        @Override
+        protected void handleCheckpoint() {
+            if (nextToComplete != null && sentEventAfterCkpt && 
!testScript.hasAlreadyFailed()) {
+                testScript.recordHasFailed();
+                context.failJob(new Exception("test failure"));
+            }
+
+            if (nextToComplete != null) {
+                hasCompletedCheckpointForCoordinator = true;
+            }
+
+            super.handleCheckpoint();
+        }
+
+        @Override
+        public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
+                throws Exception {
+            super.resetToCheckpoint(checkpointId, checkpointData);
+            markedCompletedCheckpointId = checkpointId;
+            hasCompletedCheckpointForCoordinator = true;
+            hasCompletedCheckpointForJob = true;
+        }
+
+        @Override
+        public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> result)
+                throws Exception {
+            super.checkpointCoordinator(checkpointId, result);
+            if (markedCompletedCheckpointId == NO_CHECKPOINT) {

Review Comment:
   Should it be run in the mailbox thread?
   
   Same for other methods that access coordinator states.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##########
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint. This class is an extension to {@link 
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * <h2>Stream operator recipient</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on 
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link 
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test 
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream 
operators can correctly
+ * handle received operator events and inform coordinators of completing 
checkpoint.
+ *
+ * <h2>Non-source stream task</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are 
executed independently
+ * of each other. They do not have the upstream-downstream relationship as 
operators usually do in a
+ * streaming job, and thus both of them are treated as source tasks. This test 
class further
+ * verifies situations when the tested operators are not sources, which means 
when checkpoint
+ * barriers are injected into sources, these operators may not have started 
checkpoint yet.
+ *
+ * <h2>Unaligned checkpoint</h2>
+ *
+ * <p>This class tests both aligned and unaligned checkpoints to verify that 
the correctness of the
+ * event delivery behavior around checkpoint is not affected by this condition.
+ *
+ * <h2>Non-global failure</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, failures occur at the 
coordinators' side, so
+ * they will cause the whole Flink job to fail over. In this class, test cases 
are added when there
+ * might only be fail-overs on the subtasks' side, while the coordinators are 
not affected. In this
+ * case the production infrastructure code needs to work together with user 
code (implementations
+ * inside the coordinator and operator subclass) to ensure the exactly-once 
semantics of operator
+ * event delivery.
+ *
+ * <p>In the test cases of this class, the tested coordinator would inject 
failure during its
+ * sending operator events. Besides, it is additionally guaranteed that there 
must have been a
+ * checkpoint completed before the failure is injected, and that there must be 
events sent from the
+ * coordinator to its subtask during checkpoint.
+ */
+public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
+        extends CoordinatorEventsExactlyOnceITCase {
+
+    private static final int NUM_EVENTS = 100;
+
+    private static final int DELAY = 1;
+
+    /** Whether the coordinator has sent any event to its subtask after any 
checkpoint. */
+    private static boolean sentEventAfterCkpt;
+
+    /** Whether the {@link IdlingSourceFunction} should be closed to finish 
the job. */
+    private static boolean shouldCloseSource;
+
+    private StreamExecutionEnvironment env;
+
+    @Before
+    public void setup() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        shouldCloseSource = false;
+        sentEventAfterCkpt = false;
+    }
+
+    @Test
+    public void testCoordinatorSendEvents() throws Exception {
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
NUM_EVENTS, DELAY));
+    }
+
+    @Test
+    public void testUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
NUM_EVENTS, DELAY));
+    }
+
+    @Test
+    public void testFailingCheckpoint() throws Exception {
+        executeAndVerifyResults(
+                env,
+                new FailingCheckpointOperatorFactory<>("failingCheckpoint", 
NUM_EVENTS, DELAY));
+        
assertThat(TestScript.getForOperator("failingCheckpoint-subtask0").hasAlreadyFailed())
+                .isTrue();
+    }
+
+    @Test
+    public void testFailingUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResults(
+                env,
+                new FailingCheckpointOperatorFactory<>("failingCheckpoint", 
NUM_EVENTS, DELAY));
+        
assertThat(TestScript.getForOperator("failingCheckpoint-subtask0").hasAlreadyFailed())
+                .isTrue();
+    }
+
+    private void executeAndVerifyResults(
+            StreamExecutionEnvironment env, 
EventReceivingOperatorFactory<Long, Long> factory)
+            throws Exception {
+        // The event receiving operator is not chained together with the 
source operator, so that
+        // when checkpoint barriers are injected into sources, the event 
receiving operator has not
+        // started checkpoint yet.
+        env.addSource(new IdlingSourceFunction<>(), 
TypeInformation.of(Long.class))
+                .transform(
+                        "blockCheckpointBarrier",
+                        TypeInformation.of(Long.class),
+                        new BlockCheckpointBarrierOperator<>())
+                .disableChaining()
+                .transform(factory.name, TypeInformation.of(Long.class), 
factory)
+                .addSink(new DiscardingSink<>());
+
+        JobExecutionResult executionResult =
+                MINI_CLUSTER
+                        .getMiniCluster()
+                        
.executeJobBlocking(env.getStreamGraph().getJobGraph());
+
+        List<Integer> receivedInts =
+                
executionResult.getAccumulatorResult(EventReceivingOperator.ACCUMULATOR_NAME);
+        checkListContainsSequence(receivedInts, NUM_EVENTS);
+    }
+
+    /** A mock source function that does not collect any stream record and 
finishes on demand. */
+    private static class IdlingSourceFunction<T> extends RichSourceFunction<T>
+            implements ParallelSourceFunction<T> {
+        @Override
+        public void run(SourceContext<T> ctx) throws Exception {
+            while (!shouldCloseSource) {
+                Thread.sleep(100);
+            }
+        }
+
+        @Override
+        public void cancel() {}
+    }
+
+    /**
+     * A stream operator that blocks the checkpoint barrier until the 
coordinator has sent events to
+     * its subtask. It helps to guarantee that there are events being sent 
when the coordinator has
+     * completed the checkpoint while the subtask has not yet.
+     */
+    private static class BlockCheckpointBarrierOperator<T> extends 
AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T> {
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws 
Exception {
+            super.snapshotState(context);
+            while (!sentEventAfterCkpt) {
+                Thread.sleep(100);
+            }
+        }
+    }
+
+    /**
+     * A wrapper operator factory for {@link EventReceivingOperator} and {@link
+     * EventSendingCoordinator}.
+     */
+    private static class EventReceivingOperatorFactory<IN, OUT>
+            extends AbstractStreamOperatorFactory<OUT>
+            implements CoordinatedOperatorFactory<OUT>, 
OneInputStreamOperatorFactory<IN, OUT> {
+
+        protected final String name;
+
+        protected final int numEvents;
+
+        private final int delay;
+
+        public EventReceivingOperatorFactory(String name, int numEvents, int 
delay) {
+            this.name = name;
+            this.numEvents = numEvents;
+            this.delay = delay;
+        }
+
+        @Override
+        public OperatorCoordinator.Provider getCoordinatorProvider(
+                String operatorName, OperatorID operatorID) {
+            return new OperatorCoordinator.Provider() {
+
+                @Override
+                public OperatorID getOperatorId() {
+                    return operatorID;
+                }
+
+                @Override
+                public OperatorCoordinator create(OperatorCoordinator.Context 
context) {
+                    return new EventSendingCoordinator(context, name, 
numEvents, delay);
+                }
+            };
+        }
+
+        @Override
+        public <T extends StreamOperator<OUT>> T createStreamOperator(
+                StreamOperatorParameters<OUT> parameters) {
+            EventReceivingOperator<OUT> operator = new 
EventReceivingOperator<>();
+            operator.setup(
+                    parameters.getContainingTask(),
+                    parameters.getStreamConfig(),
+                    parameters.getOutput());
+            parameters
+                    .getOperatorEventDispatcher()
+                    
.registerEventHandler(parameters.getStreamConfig().getOperatorID(), operator);
+            return (T) operator;
+        }
+
+        @Override
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return EventReceivingOperator.class;
+        }
+    }
+
+    /**
+     * A subclass of {@link 
CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator} that has the
+     * following additional behavior:
+     *
+     * <ul>
+     *   <li>The job must have completed a checkpoint before the coordinator 
injects the failure.
+     *   <li>The failure would be injected after the coordinator has completed 
its first checkpoint
+     *       and before it completes the next.
+     *   <li>There must be events being sent when the coordinator has 
completed the first checkpoint
+     *       while the subtask has not.
+     * </ul>
+     */
+    private static class EventSendingCoordinator
+            extends CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator 
{
+        private static final long NO_CHECKPOINT = -1;
+
+        /** The coordinator should have completed checkpoints before sending 
out this message. */
+        private final int checkpointBeforeMessage;
+
+        /**
+         * The id of a completed checkpoint for the job.
+         *
+         * <ul>
+         *   <li>If not set(equal to {@link #NO_CHECKPOINT}), it will be set 
to the next completed
+         *       checkpoint.
+         *   <li>If the coordinator is restored from the checkpoint, it will 
be set to the restored
+         *       checkpoint.
+         *   <li>Otherwise, its value will not be changed.
+         * </ul>
+         *
+         * <p>It is used to assist the setting of {@link 
#hasCompletedCheckpointForCoordinator} and
+         * {@link #hasCompletedCheckpointForJob}.
+         */
+        private long markedCompletedCheckpointId;
+
+        /** Whether the coordinator has completed any checkpoint. */
+        private boolean hasCompletedCheckpointForCoordinator;

Review Comment:
   How about `isCoordinatorFirstCheckpointCompleted`?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##########
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent around
+ * checkpoint. This class is an extension to {@link 
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * <h2>Stream operator recipient</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on 
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link 
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test 
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream 
operators can correctly
+ * handle received operator events and inform coordinators of completing 
checkpoint.
+ *
+ * <h2>Non-source stream task</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are 
executed independently
+ * of each other. They do not have the upstream-downstream relationship as 
operators usually do in a
+ * streaming job, and thus both of them are treated as source tasks. This test 
class further
+ * verifies situations when the tested operators are not sources, which means 
when checkpoint
+ * barriers are injected into sources, these operators may not have started 
checkpoint yet.
+ *
+ * <h2>Unaligned checkpoint</h2>
+ *
+ * <p>This class tests both aligned and unaligned checkpoints to verify that 
the correctness of the
+ * event delivery behavior around checkpoint is not affected by this condition.
+ *
+ * <h2>Non-global failure</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, failures occur at the 
coordinators' side, so
+ * they will cause the whole Flink job to fail over. In this class, test cases 
are added when there
+ * might only be fail-overs on the subtasks' side, while the coordinators are 
not affected. In this
+ * case the production infrastructure code needs to work together with user 
code (implementations
+ * inside the coordinator and operator subclass) to ensure the exactly-once 
semantics of operator
+ * event delivery.
+ *
+ * <p>In the test cases of this class, the tested coordinator would inject 
failure during its
+ * sending operator events. Besides, it is additionally guaranteed that there 
must have been a
+ * checkpoint completed before the failure is injected, and that there must be 
events sent from the
+ * coordinator to its subtask during checkpoint.
+ */
+public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
+        extends CoordinatorEventsExactlyOnceITCase {
+
+    private static final int NUM_EVENTS = 100;
+
+    private static final int DELAY = 1;
+
+    /** Whether the coordinator has sent any event to its subtask after any 
checkpoint. */
+    private static boolean sentEventAfterCkpt;
+
+    /** Whether the {@link IdlingSourceFunction} should be closed to finish 
the job. */
+    private static boolean shouldCloseSource;
+
+    private StreamExecutionEnvironment env;
+
+    @Before
+    public void setup() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        shouldCloseSource = false;
+        sentEventAfterCkpt = false;
+    }
+
+    @Test
+    public void testCoordinatorSendEvents() throws Exception {
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
NUM_EVENTS, DELAY));
+    }
+
+    @Test
+    public void testUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResults(
+                env, new EventReceivingOperatorFactory<>("eventReceiving", 
NUM_EVENTS, DELAY));
+    }
+
+    @Test
+    public void testFailingCheckpoint() throws Exception {
+        executeAndVerifyResults(
+                env,
+                new FailingCheckpointOperatorFactory<>("failingCheckpoint", 
NUM_EVENTS, DELAY));
+        
assertThat(TestScript.getForOperator("failingCheckpoint-subtask0").hasAlreadyFailed())
+                .isTrue();
+    }
+
+    @Test
+    public void testFailingUnalignedCheckpoint() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResults(
+                env,
+                new FailingCheckpointOperatorFactory<>("failingCheckpoint", 
NUM_EVENTS, DELAY));
+        
assertThat(TestScript.getForOperator("failingCheckpoint-subtask0").hasAlreadyFailed())
+                .isTrue();
+    }
+
+    private void executeAndVerifyResults(
+            StreamExecutionEnvironment env, 
EventReceivingOperatorFactory<Long, Long> factory)
+            throws Exception {
+        // The event receiving operator is not chained together with the 
source operator, so that
+        // when checkpoint barriers are injected into sources, the event 
receiving operator has not
+        // started checkpoint yet.
+        env.addSource(new IdlingSourceFunction<>(), 
TypeInformation.of(Long.class))
+                .transform(
+                        "blockCheckpointBarrier",
+                        TypeInformation.of(Long.class),
+                        new BlockCheckpointBarrierOperator<>())
+                .disableChaining()
+                .transform(factory.name, TypeInformation.of(Long.class), 
factory)
+                .addSink(new DiscardingSink<>());
+
+        JobExecutionResult executionResult =
+                MINI_CLUSTER
+                        .getMiniCluster()
+                        
.executeJobBlocking(env.getStreamGraph().getJobGraph());
+
+        List<Integer> receivedInts =
+                
executionResult.getAccumulatorResult(EventReceivingOperator.ACCUMULATOR_NAME);
+        checkListContainsSequence(receivedInts, NUM_EVENTS);
+    }
+
+    /** A mock source function that does not collect any stream record and 
finishes on demand. */
+    private static class IdlingSourceFunction<T> extends RichSourceFunction<T>
+            implements ParallelSourceFunction<T> {
+        @Override
+        public void run(SourceContext<T> ctx) throws Exception {
+            while (!shouldCloseSource) {
+                Thread.sleep(100);
+            }
+        }
+
+        @Override
+        public void cancel() {}
+    }
+
+    /**
+     * A stream operator that blocks the checkpoint barrier until the 
coordinator has sent events to
+     * its subtask. It helps to guarantee that there are events being sent 
when the coordinator has
+     * completed the checkpoint while the subtask has not yet.
+     */
+    private static class BlockCheckpointBarrierOperator<T> extends 
AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T> {
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws 
Exception {
+            super.snapshotState(context);
+            while (!sentEventAfterCkpt) {
+                Thread.sleep(100);
+            }
+        }
+    }
+
+    /**
+     * A wrapper operator factory for {@link EventReceivingOperator} and {@link
+     * EventSendingCoordinator}.
+     */
+    private static class EventReceivingOperatorFactory<IN, OUT>
+            extends AbstractStreamOperatorFactory<OUT>
+            implements CoordinatedOperatorFactory<OUT>, 
OneInputStreamOperatorFactory<IN, OUT> {
+
+        protected final String name;
+
+        protected final int numEvents;
+
+        private final int delay;
+
+        public EventReceivingOperatorFactory(String name, int numEvents, int 
delay) {
+            this.name = name;
+            this.numEvents = numEvents;
+            this.delay = delay;
+        }
+
+        @Override
+        public OperatorCoordinator.Provider getCoordinatorProvider(
+                String operatorName, OperatorID operatorID) {
+            return new OperatorCoordinator.Provider() {
+
+                @Override
+                public OperatorID getOperatorId() {
+                    return operatorID;
+                }
+
+                @Override
+                public OperatorCoordinator create(OperatorCoordinator.Context 
context) {
+                    return new EventSendingCoordinator(context, name, 
numEvents, delay);
+                }
+            };
+        }
+
+        @Override
+        public <T extends StreamOperator<OUT>> T createStreamOperator(
+                StreamOperatorParameters<OUT> parameters) {
+            EventReceivingOperator<OUT> operator = new 
EventReceivingOperator<>();
+            operator.setup(
+                    parameters.getContainingTask(),
+                    parameters.getStreamConfig(),
+                    parameters.getOutput());
+            parameters
+                    .getOperatorEventDispatcher()
+                    
.registerEventHandler(parameters.getStreamConfig().getOperatorID(), operator);
+            return (T) operator;
+        }
+
+        @Override
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return EventReceivingOperator.class;
+        }
+    }
+
+    /**
+     * A subclass of {@link 
CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator} that has the
+     * following additional behavior:
+     *
+     * <ul>
+     *   <li>The job must have completed a checkpoint before the coordinator 
injects the failure.
+     *   <li>The failure would be injected after the coordinator has completed 
its first checkpoint
+     *       and before it completes the next.
+     *   <li>There must be events being sent when the coordinator has 
completed the first checkpoint
+     *       while the subtask has not.
+     * </ul>
+     */
+    private static class EventSendingCoordinator
+            extends CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator 
{
+        private static final long NO_CHECKPOINT = -1;
+
+        /** The coordinator should have completed checkpoints before sending 
out this message. */
+        private final int checkpointBeforeMessage;
+
+        /**
+         * The id of a completed checkpoint for the job.
+         *
+         * <ul>
+         *   <li>If not set(equal to {@link #NO_CHECKPOINT}), it will be set 
to the next completed
+         *       checkpoint.
+         *   <li>If the coordinator is restored from the checkpoint, it will 
be set to the restored
+         *       checkpoint.
+         *   <li>Otherwise, its value will not be changed.

Review Comment:
   Should this doc focus on explaining the semantics of this variable? If so, 
it seems that we can just explain it refers to the ID of the last completed 
checkpoint. What does it mean to say `will not be changed`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to