tillrohrmann commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r612582795



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
##########
@@ -340,6 +341,12 @@ public AkkaRpcServiceBuilder withBindPort(int bindPort) {
         }
 
         public AkkaRpcService createAndStart() throws Exception {
+            return createAndStart(AkkaRpcService::new);
+        }
+
+        public AkkaRpcService createAndStart(

Review comment:
       I would suggest to update the commit message to say "... to support 
instantiating custom AkkaRpcServices."

##########
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGatewayAdapter;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.TriFunction;
+
+import akka.actor.ActorSystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A test suite for source enumerator (operator coordinator) for situations 
where RPC calls for
+ * split assignments (operator events) fails from time to time.
+ */
+@SuppressWarnings("serial")
+public class OperatorEventSendingCheckpointITCase {

Review comment:
       `extends TestLogger` is missing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -216,13 +201,8 @@ public void signalNoMoreSplits(int subtask) {
         // Ensure the split assignment is done by the the coordinator executor.
         callInCoordinatorThread(
                 () -> {
-                    try {
-                        operatorCoordinatorContext.sendEvent(new 
NoMoreSplitsEvent(), subtask);
-                        return null; // void return value
-                    } catch (TaskNotRunningException e) {
-                        throw new FlinkRuntimeException(
-                                "Failed to send 'NoMoreSplits' to reader " + 
subtask, e);
-                    }
+                    operatorCoordinatorContext.sendEvent(new 
NoMoreSplitsEvent(), subtask);

Review comment:
       Same here. Please ignore if this problem will be addressed in a future 
PR.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##########
@@ -96,42 +94,44 @@ public boolean isShut() {
      * (because it gets dropped through a call to {@link #reset()} or {@link 
#resetForTask(int)},
      * then the returned future till be completed exceptionally.
      */
-    public CompletableFuture<Acknowledge> sendEvent(
-            SerializedValue<OperatorEvent> event, int subtask) {
-        synchronized (lock) {
-            if (!shut) {
-                return eventSender.apply(event, subtask);
-            }
-
-            final List<BlockedEvent> eventsForTask =
-                    blockedEvents.computeIfAbsent(subtask, (key) -> new 
ArrayList<>());
-            final CompletableFuture<Acknowledge> future = new 
CompletableFuture<>();
-            eventsForTask.add(new BlockedEvent(event, subtask, future));
-            return future;
+    public void sendEvent(
+            SerializedValue<OperatorEvent> event,
+            int subtask,
+            CompletableFuture<Acknowledge> result) {
+        checkRunsInMainThread();
+
+        if (!shut) {
+            final CompletableFuture<Acknowledge> ack = 
eventSender.apply(event, subtask);
+            FutureUtils.forward(ack, result);
+            return;
         }
+
+        final List<BlockedEvent> eventsForTask =
+                blockedEvents.computeIfAbsent(subtask, (key) -> new 
ArrayList<>());
+        eventsForTask.add(new BlockedEvent(event, subtask, result));
     }
 
     /**
      * Shuts the value. All events sent through this valve are blocked until 
the valve is re-opened.
      * If the valve is already shut, this does nothing.
      */

Review comment:
       The JavaDocs seem to be wrong.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayAdapter.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.messages.LogInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
+import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;
+import org.apache.flink.types.SerializableOptional;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A class that decorates/forwards calls to a {@link TaskExecutorGateway}.
+ *
+ * <p>This class is meant as a base for custom decorators, to avoid having to 
maintain all the
+ * method overrides in each decorator.
+ */
+public class TaskExecutorGatewayAdapter implements TaskExecutorGateway {

Review comment:
       nit: Maybe call it `TaskExecutorGatewayDecorator`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -69,9 +68,9 @@
  * <ul>
  *   <li>Events pass through a special channel, the {@link 
OperatorEventValve}. If we are not
  *       currently triggering a checkpoint, then events simply pass through.
- *   <li>Atomically, with the completion of the checkpoint future for the 
coordinator, this operator
- *       operator event valve is closed. Events coming after that are held 
back (buffered), because
- *       they belong to the epoch after the checkpoint.
+ *   <li>With the completion of the checkpoint future for the coordinator, 
this operator operator

Review comment:
       operator operator -> operator

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##########
@@ -96,42 +94,44 @@ public boolean isShut() {
      * (because it gets dropped through a call to {@link #reset()} or {@link 
#resetForTask(int)},
      * then the returned future till be completed exceptionally.
      */
-    public CompletableFuture<Acknowledge> sendEvent(
-            SerializedValue<OperatorEvent> event, int subtask) {
-        synchronized (lock) {
-            if (!shut) {
-                return eventSender.apply(event, subtask);
-            }
-
-            final List<BlockedEvent> eventsForTask =
-                    blockedEvents.computeIfAbsent(subtask, (key) -> new 
ArrayList<>());
-            final CompletableFuture<Acknowledge> future = new 
CompletableFuture<>();
-            eventsForTask.add(new BlockedEvent(event, subtask, future));
-            return future;
+    public void sendEvent(
+            SerializedValue<OperatorEvent> event,
+            int subtask,
+            CompletableFuture<Acknowledge> result) {

Review comment:
       JavaDocs for the parameters are missing. Is there any contract which 
thread should complete the `result` future?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -238,8 +241,11 @@ public void notifyCheckpointAborted(long checkpointId) {
     @Override
     public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
             throws Exception {
-        // ideally we would like to check this here, however this method is 
called early during
-        // execution graph construction, before the main thread executor is set
+        // the first time this method is called is early during execution 
graph construction,
+        // before the main thread executor is set. hence this conditional 
check.
+        if (mainThreadExecutor != null) {
+            mainThreadExecutor.assertRunningInMainThread();
+        }

Review comment:
       I think we no longer need the `OperatorCoordinatorHolder.lazyInitialize` 
strictly speaking because the main thread executor is now known when creating 
the `ExecutionGraph`. This is, however, a follow up refactoring task to clean 
it up.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -195,12 +186,6 @@ public void assignSplits(SplitsAssignment<SplitT> 
assignment) {
                                             
operatorCoordinatorContext.sendEvent(
                                                     new 
AddSplitEvent<>(splits, splitSerializer),
                                                     id);

Review comment:
       Same here. If this problem will be addressed in a future PR, then ignore 
these comments.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGatewayAdapter;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.TriFunction;
+
+import akka.actor.ActorSystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A test suite for source enumerator (operator coordinator) for situations 
where RPC calls for
+ * split assignments (operator events) fails from time to time.
+ */
+@SuppressWarnings("serial")

Review comment:
       Where are the serialization warnings coming from?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##########
@@ -46,25 +46,20 @@
 
     private static final long NO_CHECKPOINT = Long.MIN_VALUE;
 
-    private final Object lock = new Object();
-
-    @GuardedBy("lock")
     private final BiFunction<
                     SerializedValue<OperatorEvent>, Integer, 
CompletableFuture<Acknowledge>>
             eventSender;
 
-    @GuardedBy("lock")
     private final Map<Integer, List<BlockedEvent>> blockedEvents = new 
LinkedHashMap<>();
 
-    @GuardedBy("lock")
     private long currentCheckpointId;
 
-    @GuardedBy("lock")
     private long lastCheckpointId;
 
-    @GuardedBy("lock")
     private boolean shut;
 
+    @Nullable private ComponentMainThreadExecutor mainThreadExecutor;

Review comment:
       I think we should update the JavaDocs of this class. With the 
introduction of the `mainThreadExecutor` this class is no longer thread safe 
and assumes that certain calls are handled by the `mainThreadExecutor`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -147,16 +146,8 @@ public MetricGroup metricGroup() {
     public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
         callInCoordinatorThread(
                 () -> {
-                    try {
-                        operatorCoordinatorContext.sendEvent(
-                                new SourceEventWrapper(event), subtaskId);
-                        return null;
-                    } catch (TaskNotRunningException e) {
-                        throw new FlinkRuntimeException(
-                                String.format(
-                                        "Failed to send event %s to subtask 
%d", event, subtaskId),
-                                e);
-                    }
+                    operatorCoordinatorContext.sendEvent(new 
SourceEventWrapper(event), subtaskId);

Review comment:
       I think this line can swallow a potential exception because we now only 
return it via the returned `CompletableFuture`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##########
@@ -96,42 +94,44 @@ public boolean isShut() {
      * (because it gets dropped through a call to {@link #reset()} or {@link 
#resetForTask(int)},
      * then the returned future till be completed exceptionally.
      */
-    public CompletableFuture<Acknowledge> sendEvent(
-            SerializedValue<OperatorEvent> event, int subtask) {
-        synchronized (lock) {
-            if (!shut) {
-                return eventSender.apply(event, subtask);
-            }
-
-            final List<BlockedEvent> eventsForTask =
-                    blockedEvents.computeIfAbsent(subtask, (key) -> new 
ArrayList<>());
-            final CompletableFuture<Acknowledge> future = new 
CompletableFuture<>();
-            eventsForTask.add(new BlockedEvent(event, subtask, future));
-            return future;
+    public void sendEvent(
+            SerializedValue<OperatorEvent> event,
+            int subtask,
+            CompletableFuture<Acknowledge> result) {

Review comment:
       nit: Returning an ack future instead of passing a future into the method 
could make the API a bit nicer. Of course, this would mean that we have to do 
something like
   
   ```
   CompletableFuture.supplyAsync(() -> sendEvent(), 
executor).thenCompose(Functions.identity())
   ```
   
   in order to get the future and execute `sendEvent` in the main thread.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##########
@@ -226,6 +225,12 @@ private static void failAllFutures(@Nullable 
List<BlockedEvent> events) {
         }
     }
 
+    private void checkRunsInMainThread() {

Review comment:
       I think the lazy initializations are no longer necessary because the 
`JobMaster` is now a `PermanentlyFencedRpcEndpoint`.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGatewayAdapter;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.TriFunction;
+
+import akka.actor.ActorSystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A test suite for source enumerator (operator coordinator) for situations 
where RPC calls for
+ * split assignments (operator events) fails from time to time.
+ */
+@SuppressWarnings("serial")
+public class OperatorEventSendingCheckpointITCase {
+
+    private static final int PARALLELISM = 1;
+    private static MiniCluster flinkCluster;
+
+    @BeforeClass
+    public static void setupMiniClusterAndEnv() throws Exception {
+        flinkCluster = new MiniClusterWithRpcIntercepting(PARALLELISM);
+        flinkCluster.start();
+        TestStreamEnvironment.setAsContext(flinkCluster, PARALLELISM);
+    }
+
+    @AfterClass
+    public static void clearEnvAndStopMiniCluster() throws Exception {
+        TestStreamEnvironment.unsetAsContext();
+        if (flinkCluster != null) {
+            flinkCluster.close();
+            flinkCluster = null;
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  tests
+    // ------------------------------------------------------------------------
+
+    /**
+     * Every second assign split event is lost. Eventually, the enumerator 
must recognize that an
+     * event was lost and trigger recovery to prevent data loss. Data loss 
would manifest in a
+     * stalled test, because we could wait forever to collect the required 
number of events back.
+     */
+    @Ignore // ignore for now, because this test fails due to FLINK-21996
+    @Test
+    public void testOperatorEventLostNoReaderFailure() throws Exception {
+        final int[] eventsToLose = new int[] {2, 4, 6};
+
+        OpEventRpcInterceptor.currentHandler =
+                new OperatorEventRpcHandler(
+                        (task, operator, event, originalRpcHandler) -> 
askTimeoutFuture(),
+                        eventsToLose);
+
+        runTest(false);
+    }
+
+    /**
+     * First and third assign split events are lost. In the middle pf all 
events being processed

Review comment:
       middle of




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to