tillrohrmann commented on a change in pull request #12815:
URL: https://github.com/apache/flink/pull/12815#discussion_r449579733



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconcilerImpl.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Default {@link ExecutionDeploymentReconciler} implementation. Detects 
missing/unknown deployments, and defers
+ * to a provided {@link ExecutionDeploymentReconciliationHandler} to resolve 
them.
+ */
+public class ExecutionDeploymentReconcilerImpl implements 
ExecutionDeploymentReconciler {

Review comment:
       ```suggestion
   public class DefaultExecutionDeploymentReconciler implements 
ExecutionDeploymentReconciler {
   ```
   
   It is not a lot more specific than `XYZImpl` but a bit since it says that 
this is the default implementation.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconcilerImpl.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Default {@link ExecutionDeploymentReconciler} implementation. Detects 
missing/unknown deployments, and defers
+ * to a provided {@link ExecutionDeploymentReconciliationHandler} to resolve 
them.
+ */
+public class ExecutionDeploymentReconcilerImpl implements 
ExecutionDeploymentReconciler {
+
+       private final ExecutionDeploymentReconciliationHandler handler;
+
+       public 
ExecutionDeploymentReconcilerImpl(ExecutionDeploymentReconciliationHandler 
handler) {
+               this.handler = handler;
+       }
+
+       @Override
+       public void reconcileExecutionDeployments(ResourceID taskExecutorHost, 
ExecutionDeploymentReport executionDeploymentReport, Set<ExecutionAttemptID> 
expectedDeployedExecutions) {
+               final Set<ExecutionAttemptID> executions = new 
HashSet<>(expectedDeployedExecutions);
+
+               
executionDeploymentReport.getExecutions().forEach(executionAttemptID -> {

Review comment:
       I prefer the KISS principle and would simply use the for-each loop here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
##########
@@ -753,8 +753,11 @@ public void deploy() throws JobException {
                                .thenCompose(Function.identity())
                                .whenCompleteAsync(
                                        (ack, failure) -> {
-                                               // only respond to the failure 
case
-                                               if (failure != null) {
+                                               if (failure == null) {
+                                                       getExecutionGraph()
+                                                               
.getExecutionDeploymentListener()

Review comment:
       This call effectively is 
`getVertex().getExecutionGraph().getExecutionDeploymentListener()` which is 
violating the law of Demeter. It effectively couples the implementation of 
where the `ExecutionDeploymentListener` lives.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -224,10 +227,34 @@ public JobMaster(
                        ClassLoader userCodeLoader,
                        SchedulerNGFactory schedulerNGFactory,
                        ShuffleMaster<?> shuffleMaster,
-                       PartitionTrackerFactory partitionTrackerFactory) throws 
Exception {
+                       PartitionTrackerFactory partitionTrackerFactory,
+                       ExecutionDeploymentTracker executionDeploymentTracker,
+                       ExecutionDeploymentReconciler.Factory 
executionDeploymentReconcilerFactory) throws Exception {
 
                super(rpcService, 
AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME), null);
 
+               final ExecutionDeploymentReconciliationHandler 
executionStateReconciliationHandler = new 
ExecutionDeploymentReconciliationHandler() {
+                       @Override
+                       public void onMissingDeployment(ExecutionAttemptID 
deployment) {
+                               log.debug("Failing deployment {} due to no 
longer being deployed.", deployment);
+                               schedulerNG.updateTaskExecutionState(new 
TaskExecutionState(
+                                       jobGraph.getJobID(), deployment, 
ExecutionState.FAILED, new FlinkException("State de-sync")

Review comment:
       Maybe add a bit more specific exception message.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTracker.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.util.Set;
+
+/**
+ * A tracker for deployed executions.
+ */
+public interface ExecutionDeploymentTracker {
+
+       /**
+        * Starts tracking the given execution that was deployed on the given 
host.
+        *
+        * @param deployment deployment to start tracking
+        * @param host hosting task executor
+        */
+       void startTrackingDeployment(ExecutionAttemptID deployment, ResourceID 
host);
+
+       /**
+        * Stops tracking the given execution.
+        *
+        * @param deployment deployment to stop tracking
+        */
+       void stopTrackingDeployment(ExecutionAttemptID deployment);
+
+       /**
+        * Returns all tracked executions for the given host.
+        *
+        * @param host hosting task executor
+        * @return tracked executions
+        */
+       Set<ExecutionAttemptID> getExecutions(ResourceID host);

Review comment:
       ```suggestion
        Set<ExecutionAttemptID> getExecutionsOn(ResourceID host);
   ```
   
   or maybe `getExecutionsOnHost`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTracker.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.util.Set;
+
+/**
+ * A tracker for deployed executions.
+ */
+public interface ExecutionDeploymentTracker {
+
+       /**
+        * Starts tracking the given execution that was deployed on the given 
host.
+        *
+        * @param deployment deployment to start tracking
+        * @param host hosting task executor
+        */
+       void startTrackingDeployment(ExecutionAttemptID deployment, ResourceID 
host);
+
+       /**
+        * Stops tracking the given execution.
+        *
+        * @param deployment deployment to stop tracking
+        */
+       void stopTrackingDeployment(ExecutionAttemptID deployment);

Review comment:
       ```suggestion
        void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptId);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToJobManagerHeartbeatPayload.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import java.util.Collections;
+
+/**
+ * Payload for heartbeats sent from the TaskExecutor to the JobManager.
+ */
+public class TaskExecutorToJobManagerHeartbeatPayload {

Review comment:
       the `AkkaRpcService` requires that all messages are serializable.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.utils.JobGraphTestUtils;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
+import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import 
org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the execution deployment-reconciliation logic in the {@link 
JobMaster}.
+ */
+public class JobMasterExecutionDeploymentReconciliationTest extends TestLogger 
{
+
+       private static final Time testingTimeout = Time.seconds(10L);
+
+       private static TestingRpcService rpcService;

Review comment:
       You could also use `TestingRpcServiceResource` here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
##########
@@ -753,8 +753,11 @@ public void deploy() throws JobException {
                                .thenCompose(Function.identity())
                                .whenCompleteAsync(
                                        (ack, failure) -> {
-                                               // only respond to the failure 
case
-                                               if (failure != null) {
+                                               if (failure == null) {
+                                                       getExecutionGraph()
+                                                               
.getExecutionDeploymentListener()
+                                                               
.onCompletedDeployment(attemptId, 
getAssignedResourceLocation().getResourceID());

Review comment:
       What happens if this `Execution` is no longer valid (e.g. it has failed 
in the meantime?). Shouldn't we guard against this situation similar to what we 
do in `ExecutionVertex.notifyStateTransition` by checking what the 
`currentExecution` is?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
##########
@@ -1601,6 +1604,8 @@ private boolean transitionState(ExecutionState 
currentState, ExecutionState targ
                                }
                        }
 
+                       
getExecutionGraph().getExecutionStateUpdateListener().onStateUpdate(attemptId, 
state);

Review comment:
       I think we can move this to `ExecutionGraph.notifyExecutionChange`. That 
way we will only react to signals coming from a valid `Execution`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconciler.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
+
+import java.util.Set;
+
+/**
+ * Component for reconciling the deployment state of executions.
+ */
+public interface ExecutionDeploymentReconciler {
+
+       /**
+        * Factory for {@link ExecutionDeploymentReconciler}.
+        */
+       interface Factory {
+               ExecutionDeploymentReconciler 
get(ExecutionDeploymentReconciliationHandler reconciliationHandler);

Review comment:
       Maybe
   
   ```suggestion
                ExecutionDeploymentReconciler 
create(ExecutionDeploymentReconciliationHandler reconciliationHandler);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconciler.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
+
+import java.util.Set;
+
+/**
+ * Component for reconciling the deployment state of executions.
+ */
+public interface ExecutionDeploymentReconciler {
+
+       /**
+        * Factory for {@link ExecutionDeploymentReconciler}.
+        */
+       interface Factory {
+               ExecutionDeploymentReconciler 
get(ExecutionDeploymentReconciliationHandler reconciliationHandler);
+       }
+
+       /**
+        * Reconciles the deployment states between all reported/expected 
executions for the given task executor.
+        *
+        * @param taskExecutorHost

Review comment:
       Description is missing

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconciliationHandler.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Interface for triggering actions in case of state mismatches.
+ */
+public interface ExecutionDeploymentReconciliationHandler {
+       /**
+        * Called if an execution is expected to be hosted on a task executor, 
but isn't.
+        */
+       void onMissingDeployment(ExecutionAttemptID deployment);

Review comment:
       ```suggestion
        void onMissingDeployment(ExecutionAttemptID executionAttemptId);
   ```
   
   not sure whether the parameter should really be named `deployment`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTracker.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.util.Set;
+
+/**
+ * A tracker for deployed executions.
+ */
+public interface ExecutionDeploymentTracker {
+
+       /**
+        * Starts tracking the given execution that was deployed on the given 
host.
+        *
+        * @param deployment deployment to start tracking
+        * @param host hosting task executor
+        */
+       void startTrackingDeployment(ExecutionAttemptID deployment, ResourceID 
host);

Review comment:
       ```suggestion
        void startTrackingDeploymentOf(ExecutionAttemptID executionAttemptId, 
ResourceID host);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconciliationHandler.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Interface for triggering actions in case of state mismatches.
+ */
+public interface ExecutionDeploymentReconciliationHandler {
+       /**
+        * Called if an execution is expected to be hosted on a task executor, 
but isn't.
+        */

Review comment:
       The JavaDocs are not complete. For interfaces I think we should provide 
complete JavaDocs.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTrackerImpl.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Default {@link ExecutionDeploymentTracker} implementation.
+ */
+public class ExecutionDeploymentTrackerImpl implements 
ExecutionDeploymentTracker, ExecutionDeploymentListener {

Review comment:
       ```suggestion
   public class DefaultExecutionDeploymentTracker implements 
ExecutionDeploymentTracker, ExecutionDeploymentListener {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTrackerImpl.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Default {@link ExecutionDeploymentTracker} implementation.
+ */
+public class ExecutionDeploymentTrackerImpl implements 
ExecutionDeploymentTracker, ExecutionDeploymentListener {
+
+       private final Map<ResourceID, Set<ExecutionAttemptID>> executionsByHost 
= new HashMap<>();
+       private final Map<ExecutionAttemptID, ResourceID> hostByExecution = new 
HashMap<>();
+
+       @Override
+       public void startTrackingDeployment(ExecutionAttemptID execution, 
ResourceID host) {
+               hostByExecution.put(execution, host);
+               executionsByHost.compute(host, (resourceID, 
executionAttemptIds) -> {
+                       if (executionAttemptIds == null) {
+                               executionAttemptIds = new HashSet<>();
+                       }
+                       executionAttemptIds.add(execution);
+                       return executionAttemptIds;
+               });

Review comment:
       ```suggestion
                executionsByHost.computeIfAbsent(host, ignored -> new 
HashSet<>()).add(execution);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconcilerImpl.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Default {@link ExecutionDeploymentReconciler} implementation. Detects 
missing/unknown deployments, and defers
+ * to a provided {@link ExecutionDeploymentReconciliationHandler} to resolve 
them.
+ */
+public class ExecutionDeploymentReconcilerImpl implements 
ExecutionDeploymentReconciler {
+
+       private final ExecutionDeploymentReconciliationHandler handler;
+
+       public 
ExecutionDeploymentReconcilerImpl(ExecutionDeploymentReconciliationHandler 
handler) {
+               this.handler = handler;
+       }
+
+       @Override
+       public void reconcileExecutionDeployments(ResourceID taskExecutorHost, 
ExecutionDeploymentReport executionDeploymentReport, Set<ExecutionAttemptID> 
expectedDeployedExecutions) {
+               final Set<ExecutionAttemptID> executions = new 
HashSet<>(expectedDeployedExecutions);
+
+               
executionDeploymentReport.getExecutions().forEach(executionAttemptID -> {
+                       boolean isTracked = 
executions.remove(executionAttemptID);
+                       if (!isTracked) {
+                               handler.onUnknownDeployment(executionAttemptID, 
taskExecutorHost);

Review comment:
       Does it make sense to batch the executions which are unknown? That way 
we would not have to make a lookup for every unknown `Execution`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconcilerImpl.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Default {@link ExecutionDeploymentReconciler} implementation. Detects 
missing/unknown deployments, and defers
+ * to a provided {@link ExecutionDeploymentReconciliationHandler} to resolve 
them.
+ */
+public class ExecutionDeploymentReconcilerImpl implements 
ExecutionDeploymentReconciler {
+
+       private final ExecutionDeploymentReconciliationHandler handler;
+
+       public 
ExecutionDeploymentReconcilerImpl(ExecutionDeploymentReconciliationHandler 
handler) {
+               this.handler = handler;
+       }
+
+       @Override
+       public void reconcileExecutionDeployments(ResourceID taskExecutorHost, 
ExecutionDeploymentReport executionDeploymentReport, Set<ExecutionAttemptID> 
expectedDeployedExecutions) {
+               final Set<ExecutionAttemptID> executions = new 
HashSet<>(expectedDeployedExecutions);
+
+               
executionDeploymentReport.getExecutions().forEach(executionAttemptID -> {
+                       boolean isTracked = 
executions.remove(executionAttemptID);
+                       if (!isTracked) {
+                               handler.onUnknownDeployment(executionAttemptID, 
taskExecutorHost);
+                       }
+               });
+               executions.forEach(handler::onMissingDeployment);

Review comment:
       Same here. If `handler.onMissingDeployments` takes a collection of 
`Execution` we only would have to call it once.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTrackerImplTest.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.core.IsCollectionContaining.hasItems;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link ExecutionDeploymentTrackerImpl}.
+ */
+public class ExecutionDeploymentTrackerImplTest extends TestLogger {
+
+       @Test
+       public void testStartTracking() {
+               final ExecutionDeploymentTrackerImpl tracker = new 
ExecutionDeploymentTrackerImpl();
+
+               final ExecutionAttemptID attemptId1 = new ExecutionAttemptID();
+               final ResourceID resourceId1 = ResourceID.generate();
+               tracker.startTrackingDeployment(attemptId1, resourceId1);
+
+               assertThat(tracker.getExecutions(resourceId1), 
hasItems(attemptId1));
+       }
+
+       @Test
+       public void testOnCompleteEquivalentToStartTracking() {
+               final ExecutionDeploymentTrackerImpl tracker = new 
ExecutionDeploymentTrackerImpl();
+
+               final ExecutionAttemptID attemptId1 = new ExecutionAttemptID();
+               final ResourceID resourceId1 = ResourceID.generate();
+               tracker.onCompletedDeployment(attemptId1, resourceId1);
+
+               assertThat(tracker.getExecutions(resourceId1), 
hasItems(attemptId1));
+       }
+
+       @Test
+       public void testStopTracking() {
+               final ExecutionDeploymentTrackerImpl tracker = new 
ExecutionDeploymentTrackerImpl();
+
+               final ExecutionAttemptID attemptId1 = new ExecutionAttemptID();
+               final ResourceID resourceId1 = ResourceID.generate();
+               tracker.startTrackingDeployment(attemptId1, resourceId1);
+
+               tracker.stopTrackingDeployment(attemptId1);
+
+               assertThat(tracker.getExecutions(resourceId1), empty());
+       }
+
+       @Test
+       public void testStopTrackingDoesNotAffectOtherIds() {
+               final ExecutionDeploymentTrackerImpl tracker = new 
ExecutionDeploymentTrackerImpl();
+
+               final ExecutionAttemptID attemptId1 = new ExecutionAttemptID();
+               final ResourceID resourceId1 = ResourceID.generate();
+               tracker.startTrackingDeployment(attemptId1, resourceId1);
+
+               tracker.stopTrackingDeployment(new ExecutionAttemptID());
+
+               assertThat(tracker.getExecutions(resourceId1), 
hasItems(attemptId1));
+       }
+
+       @Test
+       public void testStopTrackingUnknownExecutionDoesNotThrowException() {
+               final ExecutionDeploymentTrackerImpl tracker = new 
ExecutionDeploymentTrackerImpl();
+
+               final ExecutionAttemptID attemptId2 = new ExecutionAttemptID();
+               tracker.stopTrackingDeployment(attemptId2);
+       }
+
+       @Test
+       public void testGetExecutionsReturnsEmptySetForUnknownHost() {
+               final ExecutionDeploymentTrackerImpl tracker = new 
ExecutionDeploymentTrackerImpl();
+
+               assertThat(tracker.getExecutions(ResourceID.generate()), 
notNullValue());

Review comment:
       Should we also assert that the returned `Collection` is empty?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTrackerImpl.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Default {@link ExecutionDeploymentTracker} implementation.
+ */
+public class ExecutionDeploymentTrackerImpl implements 
ExecutionDeploymentTracker, ExecutionDeploymentListener {

Review comment:
       Why does this class implement `ExecutionDeploymentListener`? I think it 
is nowhere used.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconcilerImplTest.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link ExecutionDeploymentReconcilerImpl}.
+ */
+public class ExecutionDeploymentReconcilerImplTest extends TestLogger {
+
+       @Test
+       public void testMatchingDeployments() throws Exception {
+               runTest((reconciler, missingFuture, unknownFuture) -> {
+                       ResourceID resourceId = ResourceID.generate();
+                       ExecutionAttemptID attemptId = new ExecutionAttemptID();
+
+                       reconciler.reconcileExecutionDeployments(
+                               resourceId,
+                               new 
ExecutionDeploymentReport(Collections.singleton(attemptId)),
+                               Collections.singleton(attemptId));
+
+                       assertFalse(missingFuture.isDone());
+                       assertFalse(unknownFuture.isDone());
+               });
+       }

Review comment:
       It's mainly a personal preference but I think tests are easier to 
understand if one know what's going on. With `runTest` a lot of details are 
being hidden. For example, the below suggestion makes it a bit easier to 
understand what `missingFuture` and `unknownFuture` actually are.
   
   ```suggestion
          @Test
        public void test() {
                final TestingExecutionDeploymentReconciliationHandler handler = 
new TestingExecutionDeploymentReconciliationHandler();
                final ExecutionDeploymentReconciler reconciler = new 
ExecutionDeploymentReconcilerImpl(handler);
                ResourceID resourceId = ResourceID.generate();
                ExecutionAttemptID attemptId = new ExecutionAttemptID();
   
                reconciler.reconcileExecutionDeployments(
                        resourceId,
                        new 
ExecutionDeploymentReport(Collections.singleton(attemptId)),
                        Collections.singleton(attemptId));
   
                assertThat(handler.getMissingDeployments(), is(empty()));
                assertFalse(handler.getUnknownDeployments(), is(empty()));
        }
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.utils.JobGraphTestUtils;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
+import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import 
org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the execution deployment-reconciliation logic in the {@link 
JobMaster}.
+ */
+public class JobMasterExecutionDeploymentReconciliationTest extends TestLogger 
{
+
+       private static final Time testingTimeout = Time.seconds(10L);
+
+       private static TestingRpcService rpcService;
+
+       private final HeartbeatServices heartbeatServices = new 
HeartbeatServices(Integer.MAX_VALUE, Integer.MAX_VALUE);
+
+       private final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+       private final SettableLeaderRetrievalService 
resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
+       private final TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+
+       @Rule
+       public final TestingFatalErrorHandlerResource 
testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
+
+       @Before
+       public void setup() {
+               
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
+               
haServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
+               haServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
+       }
+
+       @After
+       public void shutdown() {
+               rpcService.clearGateways();
+       }
+
+       @BeforeClass
+       public static void setupClass() {
+               rpcService = new TestingRpcService();
+       }
+
+       @AfterClass
+       public static void shutdownClass() throws ExecutionException, 
InterruptedException {
+               rpcService.stopService().get();
+       }
+
+       @Test
+       public void testExecutionDeploymentReconciliation() throws Exception {
+               JobMasterBuilder.TestingOnCompletionActions onCompletionActions 
= new JobMasterBuilder.TestingOnCompletionActions();
+               JobMaster jobMaster = 
createAndStartJobMaster(onCompletionActions);
+               JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+               rpcService.registerGateway(jobMasterGateway.getAddress(), 
jobMasterGateway);
+
+               final 
JobMasterPartitionReleaseTest.AllocationIdsResourceManagerGateway 
resourceManagerGateway = createResourceManagerGateway();
+
+               final CompletableFuture<Void> taskSubmissionFuture = new 
CompletableFuture<>();
+               final CompletableFuture<ExecutionAttemptID> 
taskCancellationFuture = new CompletableFuture<>();
+               TaskExecutorGateway taskExecutorGateway = 
createTaskExecutorGateway(taskSubmissionFuture, taskCancellationFuture);
+               LocalUnresolvedTaskManagerLocation 
localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
+
+               registerTaskExecutorAndOfferSlots(resourceManagerGateway, 
jobMasterGateway, taskExecutorGateway, localUnresolvedTaskManagerLocation);
+
+               taskSubmissionFuture.get();

Review comment:
       I think this test is unstable because the completion of this future does 
not guarantee that we are tracking the respective task. If you add a 
`Thread.sleep()` in line 161, then you can reproduce it. The reason is that we 
use a `whenCompleteAsync` when deploying an `Execution`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the execution deployment-reconciliation logic in the {@link 
TaskExecutor}.
+ */
+public class TaskExecutorExecutionDeploymentReconciliationTest extends 
TestLogger {
+
+       private static final Time timeout = Time.seconds(10L);
+
+       private static TestingRpcService rpc;
+
+       private final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+       private final SettableLeaderRetrievalService jobManagerLeaderRetriever 
= new SettableLeaderRetrievalService();
+       private final SettableLeaderRetrievalService 
resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
+       private final JobID jobId = new JobID();
+
+       @Rule
+       public final TestingFatalErrorHandlerResource 
testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
+
+       @Before
+       public void setup() {
+               
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
+               haServices.setJobMasterLeaderRetriever(jobId, 
jobManagerLeaderRetriever);
+       }
+
+       @After
+       public void shutdown() {
+               rpc.clearGateways();
+       }
+
+       @BeforeClass
+       public static void setupClass() {
+               rpc = new TestingRpcService();
+       }
+
+       @AfterClass
+       public static void shutdownClass() throws ExecutionException, 
InterruptedException {
+               rpc.stopService().get();
+       }
+
+       @Test
+       public void testDeployedExecutionReporting() throws Exception {
+               final OneShotLatch slotOfferLatch = new OneShotLatch();
+               final BlockingQueue<Set<ExecutionAttemptID>> 
deployedExecutionsFuture = new ArrayBlockingQueue<>(3);
+               final CompletableFuture<Void> taskFinishedFuture = new 
CompletableFuture<>();
+               final ResourceID jobManagerResourceId = ResourceID.generate();
+               final TestingJobMasterGateway jobMasterGateway = 
setupJobManagerGateway(slotOfferLatch, deployedExecutionsFuture, 
taskFinishedFuture, jobManagerResourceId);
+
+               final CompletableFuture<SlotReport> initialSlotReportFuture = 
new CompletableFuture<>();
+               final TestingResourceManagerGateway 
testingResourceManagerGateway = 
setupResourceManagerGateway(initialSlotReportFuture);
+
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskSlotTable(TaskSlotUtils.createTaskSlotTable(1, 
timeout))
+                       .setShuffleEnvironment(new 
NettyShuffleEnvironmentBuilder().build())
+                       .build();
+
+               final TestingTaskExecutor taskExecutor = 
createTestingTaskExecutor(taskManagerServices);
+
+               try {
+                       taskExecutor.start();
+                       taskExecutor.waitUntilStarted();
+
+                       final TaskExecutorGateway taskExecutorGateway = 
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+                       final TaskDeploymentDescriptor taskDeploymentDescriptor 
= createTaskDeploymentDescriptor(jobId);
+
+                       connectComponentsAndRequestSlot(jobMasterGateway, 
testingResourceManagerGateway, taskExecutorGateway, 
taskManagerServices.getJobLeaderService(), initialSlotReportFuture, 
taskDeploymentDescriptor.getAllocationId());
+
+                       TestingInvokable.sync = new BlockerSync();
+
+                       // This ensures TM has been successfully registered to 
JM.
+                       slotOfferLatch.await();
+
+                       AllocatedSlotReport slotAllocationReport = new 
AllocatedSlotReport(jobId, Collections.singleton(new AllocatedSlotInfo(0, 
taskDeploymentDescriptor.getAllocationId())));
+
+                       // nothing as deployed, so the deployment report should 
be empty
+                       
taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, 
slotAllocationReport);
+                       assertThat(deployedExecutionsFuture.take(), hasSize(0));
+
+                       
taskExecutorGateway.submitTask(taskDeploymentDescriptor, 
jobMasterGateway.getFencingToken(), timeout)
+                               .get();
+
+                       TestingInvokable.sync.awaitBlocker();
+
+                       // task is deployed, so the deployment report should 
contain it
+                       
taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, 
slotAllocationReport);
+                       assertThat(deployedExecutionsFuture.take(), 
hasItem(taskDeploymentDescriptor.getExecutionAttemptId()));
+
+                       TestingInvokable.sync.releaseBlocker();
+
+                       // task is finished ans was cleaned up, so the 
deployment report should be empty
+                       taskFinishedFuture.get();
+                       
taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, 
slotAllocationReport);
+                       assertThat(deployedExecutionsFuture.take(), hasSize(0));
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+               }
+       }
+
+       /**
+        * Test invokable which completes the given future when executed.
+        */
+       public static class TestingInvokable extends AbstractInvokable {
+
+               static BlockerSync sync;
+
+               public TestingInvokable(Environment environment) {
+                       super(environment);
+               }
+
+               @Override
+               public void invoke() throws Exception {
+                       sync.block();
+               }
+       }
+
+       private TestingTaskExecutor 
createTestingTaskExecutor(TaskManagerServices taskManagerServices) throws 
IOException {
+               final Configuration configuration = new Configuration();
+               return new TestingTaskExecutor(
+                       rpc,
+                       TaskManagerConfiguration.fromConfiguration(
+                               configuration,
+                               
TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(configuration),
+                               
InetAddress.getLoopbackAddress().getHostAddress()),
+                       haServices,
+                       taskManagerServices,
+                       ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
+                       new HeartbeatServices(1_000L, 30_000L),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+                       null,
+                       new BlobCacheService(
+                               configuration,
+                               new VoidBlobStore(),
+                               null),
+                       testingFatalErrorHandlerResource.getFatalErrorHandler(),
+                       new TestingTaskExecutorPartitionTracker(),
+                       
TaskManagerRunner.createBackPressureSampleService(configuration, 
rpc.getScheduledExecutor()));
+       }
+
+       private static TaskDeploymentDescriptor 
createTaskDeploymentDescriptor(JobID jobId) throws IOException {
+               final ResultPartitionDeploymentDescriptor 
taskResultPartitionDescriptor =
+                       
PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
+
+               return 
TaskExecutorSubmissionTest.createTaskDeploymentDescriptor(
+                       jobId,
+                       "job",
+                       
taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId(),
+                       new SerializedValue<>(new ExecutionConfig()),
+                       "Sender",
+                       1,
+                       0,
+                       1,
+                       0,
+                       new Configuration(),
+                       new Configuration(),
+                       TestingInvokable.class.getName(),
+                       
Collections.singletonList(taskResultPartitionDescriptor),
+                       Collections.emptyList(),
+                       Collections.emptyList(),
+                       Collections.emptyList(),
+                       0);

Review comment:
       Can we use `TaskDeploymentDescriptorBuilder` here?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.utils.JobGraphTestUtils;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
+import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import 
org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the execution deployment-reconciliation logic in the {@link 
JobMaster}.
+ */
+public class JobMasterExecutionDeploymentReconciliationTest extends TestLogger 
{
+
+       private static final Time testingTimeout = Time.seconds(10L);
+
+       private static TestingRpcService rpcService;
+
+       private final HeartbeatServices heartbeatServices = new 
HeartbeatServices(Integer.MAX_VALUE, Integer.MAX_VALUE);
+
+       private final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+       private final SettableLeaderRetrievalService 
resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
+       private final TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+
+       @Rule
+       public final TestingFatalErrorHandlerResource 
testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
+
+       @Before
+       public void setup() {
+               
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
+               
haServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
+               haServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
+       }
+
+       @After
+       public void shutdown() {
+               rpcService.clearGateways();
+       }
+
+       @BeforeClass
+       public static void setupClass() {
+               rpcService = new TestingRpcService();
+       }
+
+       @AfterClass
+       public static void shutdownClass() throws ExecutionException, 
InterruptedException {
+               rpcService.stopService().get();
+       }
+
+       @Test
+       public void testExecutionDeploymentReconciliation() throws Exception {
+               JobMasterBuilder.TestingOnCompletionActions onCompletionActions 
= new JobMasterBuilder.TestingOnCompletionActions();
+               JobMaster jobMaster = 
createAndStartJobMaster(onCompletionActions);
+               JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+               rpcService.registerGateway(jobMasterGateway.getAddress(), 
jobMasterGateway);
+
+               final 
JobMasterPartitionReleaseTest.AllocationIdsResourceManagerGateway 
resourceManagerGateway = createResourceManagerGateway();

Review comment:
       One could make `AllocationIdsResourceManagerGateway` a top level class.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ExecutionDeploymentReport.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.util.Set;
+
+/**
+ * A report about the currently deployed executions of a TaskExecutor.
+ */
+public class ExecutionDeploymentReport {

Review comment:
       ```suggestion
   public class ExecutionDeploymentReport implements Serializable {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToJobManagerHeartbeatPayload.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import java.util.Collections;
+
+/**
+ * Payload for heartbeats sent from the TaskExecutor to the JobManager.
+ */
+public class TaskExecutorToJobManagerHeartbeatPayload {

Review comment:
       ```suggestion
   public class TaskExecutorToJobManagerHeartbeatPayload implements 
Serializable {
   
   private static final long serialVersionUID = ....
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java
##########
@@ -162,10 +162,10 @@ private void 
testPartitionReleaseOrPromotionOnJobTermination(Function<TestSetup,
                }
        }
 
-       private static final class AllocationIdsResourceManagerGateway extends 
TestingResourceManagerGateway {
+       public static final class AllocationIdsResourceManagerGateway extends 
TestingResourceManagerGateway {

Review comment:
       Maybe make this a top level class.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the execution deployment-reconciliation logic in the {@link 
TaskExecutor}.
+ */
+public class TaskExecutorExecutionDeploymentReconciliationTest extends 
TestLogger {
+
+       private static final Time timeout = Time.seconds(10L);
+
+       private static TestingRpcService rpc;

Review comment:
       Maybe use `TestingRpcServiceResource`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the execution deployment-reconciliation logic in the {@link 
TaskExecutor}.
+ */
+public class TaskExecutorExecutionDeploymentReconciliationTest extends 
TestLogger {
+
+       private static final Time timeout = Time.seconds(10L);
+
+       private static TestingRpcService rpc;
+
+       private final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+       private final SettableLeaderRetrievalService jobManagerLeaderRetriever 
= new SettableLeaderRetrievalService();
+       private final SettableLeaderRetrievalService 
resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
+       private final JobID jobId = new JobID();
+
+       @Rule
+       public final TestingFatalErrorHandlerResource 
testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
+
+       @Before
+       public void setup() {
+               
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
+               haServices.setJobMasterLeaderRetriever(jobId, 
jobManagerLeaderRetriever);
+       }
+
+       @After
+       public void shutdown() {
+               rpc.clearGateways();
+       }
+
+       @BeforeClass
+       public static void setupClass() {
+               rpc = new TestingRpcService();
+       }
+
+       @AfterClass
+       public static void shutdownClass() throws ExecutionException, 
InterruptedException {
+               rpc.stopService().get();
+       }
+
+       @Test
+       public void testDeployedExecutionReporting() throws Exception {
+               final OneShotLatch slotOfferLatch = new OneShotLatch();
+               final BlockingQueue<Set<ExecutionAttemptID>> 
deployedExecutionsFuture = new ArrayBlockingQueue<>(3);
+               final CompletableFuture<Void> taskFinishedFuture = new 
CompletableFuture<>();
+               final ResourceID jobManagerResourceId = ResourceID.generate();
+               final TestingJobMasterGateway jobMasterGateway = 
setupJobManagerGateway(slotOfferLatch, deployedExecutionsFuture, 
taskFinishedFuture, jobManagerResourceId);
+
+               final CompletableFuture<SlotReport> initialSlotReportFuture = 
new CompletableFuture<>();
+               final TestingResourceManagerGateway 
testingResourceManagerGateway = 
setupResourceManagerGateway(initialSlotReportFuture);
+
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskSlotTable(TaskSlotUtils.createTaskSlotTable(1, 
timeout))
+                       .setShuffleEnvironment(new 
NettyShuffleEnvironmentBuilder().build())
+                       .build();
+
+               final TestingTaskExecutor taskExecutor = 
createTestingTaskExecutor(taskManagerServices);
+
+               try {
+                       taskExecutor.start();
+                       taskExecutor.waitUntilStarted();
+
+                       final TaskExecutorGateway taskExecutorGateway = 
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+                       final TaskDeploymentDescriptor taskDeploymentDescriptor 
= createTaskDeploymentDescriptor(jobId);
+
+                       connectComponentsAndRequestSlot(jobMasterGateway, 
testingResourceManagerGateway, taskExecutorGateway, 
taskManagerServices.getJobLeaderService(), initialSlotReportFuture, 
taskDeploymentDescriptor.getAllocationId());
+
+                       TestingInvokable.sync = new BlockerSync();
+
+                       // This ensures TM has been successfully registered to 
JM.
+                       slotOfferLatch.await();
+
+                       AllocatedSlotReport slotAllocationReport = new 
AllocatedSlotReport(jobId, Collections.singleton(new AllocatedSlotInfo(0, 
taskDeploymentDescriptor.getAllocationId())));
+
+                       // nothing as deployed, so the deployment report should 
be empty
+                       
taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, 
slotAllocationReport);
+                       assertThat(deployedExecutionsFuture.take(), hasSize(0));
+
+                       
taskExecutorGateway.submitTask(taskDeploymentDescriptor, 
jobMasterGateway.getFencingToken(), timeout)
+                               .get();
+
+                       TestingInvokable.sync.awaitBlocker();
+
+                       // task is deployed, so the deployment report should 
contain it
+                       
taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, 
slotAllocationReport);
+                       assertThat(deployedExecutionsFuture.take(), 
hasItem(taskDeploymentDescriptor.getExecutionAttemptId()));
+
+                       TestingInvokable.sync.releaseBlocker();
+
+                       // task is finished ans was cleaned up, so the 
deployment report should be empty
+                       taskFinishedFuture.get();
+                       
taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, 
slotAllocationReport);
+                       assertThat(deployedExecutionsFuture.take(), hasSize(0));
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+               }
+       }
+
+       /**
+        * Test invokable which completes the given future when executed.
+        */
+       public static class TestingInvokable extends AbstractInvokable {
+
+               static BlockerSync sync;
+
+               public TestingInvokable(Environment environment) {
+                       super(environment);
+               }
+
+               @Override
+               public void invoke() throws Exception {
+                       sync.block();
+               }
+       }
+
+       private TestingTaskExecutor 
createTestingTaskExecutor(TaskManagerServices taskManagerServices) throws 
IOException {
+               final Configuration configuration = new Configuration();
+               return new TestingTaskExecutor(
+                       rpc,
+                       TaskManagerConfiguration.fromConfiguration(
+                               configuration,
+                               
TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(configuration),
+                               
InetAddress.getLoopbackAddress().getHostAddress()),
+                       haServices,
+                       taskManagerServices,
+                       ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
+                       new HeartbeatServices(1_000L, 30_000L),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+                       null,
+                       new BlobCacheService(
+                               configuration,
+                               new VoidBlobStore(),
+                               null),
+                       testingFatalErrorHandlerResource.getFatalErrorHandler(),
+                       new TestingTaskExecutorPartitionTracker(),
+                       
TaskManagerRunner.createBackPressureSampleService(configuration, 
rpc.getScheduledExecutor()));
+       }
+
+       private static TaskDeploymentDescriptor 
createTaskDeploymentDescriptor(JobID jobId) throws IOException {
+               final ResultPartitionDeploymentDescriptor 
taskResultPartitionDescriptor =
+                       
PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
+
+               return 
TaskExecutorSubmissionTest.createTaskDeploymentDescriptor(
+                       jobId,
+                       "job",
+                       
taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId(),
+                       new SerializedValue<>(new ExecutionConfig()),
+                       "Sender",
+                       1,
+                       0,
+                       1,
+                       0,
+                       new Configuration(),
+                       new Configuration(),
+                       TestingInvokable.class.getName(),
+                       
Collections.singletonList(taskResultPartitionDescriptor),
+                       Collections.emptyList(),
+                       Collections.emptyList(),
+                       Collections.emptyList(),
+                       0);
+       }
+
+       private static TestingJobMasterGateway 
setupJobManagerGateway(OneShotLatch slotOfferLatch, 
BlockingQueue<Set<ExecutionAttemptID>> deployedExecutionsFuture, 
CompletableFuture<Void> taskFinishedFuture, ResourceID jobManagerResourceId) {
+               return new TestingJobMasterGatewayBuilder()
+                       .setRegisterTaskManagerFunction((s, location) -> 
CompletableFuture.completedFuture(new 
JMTMRegistrationSuccess(jobManagerResourceId)))
+                       .setOfferSlotsFunction((resourceID, slotOffers) -> {
+                               slotOfferLatch.trigger();
+                               return 
CompletableFuture.completedFuture(slotOffers);
+                       })
+                       .setTaskManagerHeartbeatConsumer((resourceID, 
taskExecutorToJobManagerHeartbeatPayload) -> {
+                               ExecutionDeploymentReport 
executionDeploymentReport = 
taskExecutorToJobManagerHeartbeatPayload.getExecutionDeploymentReport();
+                               
deployedExecutionsFuture.add(executionDeploymentReport.getExecutions());
+                       })
+                       .setUpdateTaskExecutionStateFunction(taskExecutionState 
-> {
+                               if (taskExecutionState.getExecutionState() == 
ExecutionState.FINISHED) {
+                                       taskFinishedFuture.complete(null);
+                               }
+                               return 
CompletableFuture.completedFuture(Acknowledge.get());
+                       })
+                       .build();
+       }
+
+       private static TestingResourceManagerGateway 
setupResourceManagerGateway(CompletableFuture<SlotReport> 
initialSlotReportFuture) {
+               final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+               
testingResourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3
 -> {
+                       
initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f2);
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               });
+               
testingResourceManagerGateway.setRegisterTaskExecutorFunction(input -> 
CompletableFuture.completedFuture(
+                       new TaskExecutorRegistrationSuccess(
+                               new InstanceID(),
+                               
testingResourceManagerGateway.getOwnResourceId(),
+                               new ClusterInformation("blobServerHost", 
55555))));
+               return testingResourceManagerGateway;
+       }
+
+       private void connectComponentsAndRequestSlot(
+                       JobMasterGateway jobMasterGateway,
+                       ResourceManagerGateway resourceManagerGateway,
+                       TaskExecutorGateway taskExecutorGateway,
+                       JobLeaderService jobLeaderService,
+                       CompletableFuture<SlotReport> initialSlotReportFuture,
+                       AllocationID allocationId) throws Exception {
+               final String jobMasterAddress = "jm";
+               rpc.registerGateway(jobMasterAddress, jobMasterGateway);
+               rpc.registerGateway(resourceManagerGateway.getAddress(), 
resourceManagerGateway);
+
+               // inform the task manager about the job leader
+               jobLeaderService.addJob(jobId, jobMasterAddress);
+               jobManagerLeaderRetriever.notifyListener(jobMasterAddress, 
UUID.randomUUID());
+               
resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(),
 resourceManagerGateway.getFencingToken().toUUID());
+
+               final Optional<SlotStatus> slotStatusOptional = 
StreamSupport.stream(initialSlotReportFuture.get().spliterator(), false)
+                       .findAny();
+
+               assertTrue(slotStatusOptional.isPresent());
+
+               while (true) {
+                       try {
+                               taskExecutorGateway.requestSlot(
+                                       slotStatusOptional.get().getSlotID(),
+                                       jobId,
+                                       allocationId,
+                                       ResourceProfile.ZERO,
+                                       jobMasterAddress,
+                                       
resourceManagerGateway.getFencingToken(),
+                                       timeout
+                               ).get();
+                               break;
+                       } catch (Exception e) {
+                               // the proper establishment of the RM 
connection is tracked
+                               // asynchronously, so we have to poll here 
until it went through
+                               // until then, slot requests will fail with an 
exception
+                               Thread.sleep(50);
+                       }
+               }

Review comment:
       If you want to wait that the `TaskExecutor` has established the 
connection to the `ResourceManager`, then one can wait for the 
`ResourceManagerGateway.sendSlotReport` call.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the execution deployment-reconciliation logic in the {@link 
TaskExecutor}.
+ */
+public class TaskExecutorExecutionDeploymentReconciliationTest extends 
TestLogger {
+
+       private static final Time timeout = Time.seconds(10L);
+
+       private static TestingRpcService rpc;
+
+       private final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+       private final SettableLeaderRetrievalService jobManagerLeaderRetriever 
= new SettableLeaderRetrievalService();
+       private final SettableLeaderRetrievalService 
resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
+       private final JobID jobId = new JobID();
+
+       @Rule
+       public final TestingFatalErrorHandlerResource 
testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
+
+       @Before
+       public void setup() {
+               
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
+               haServices.setJobMasterLeaderRetriever(jobId, 
jobManagerLeaderRetriever);
+       }
+
+       @After
+       public void shutdown() {
+               rpc.clearGateways();
+       }
+
+       @BeforeClass
+       public static void setupClass() {
+               rpc = new TestingRpcService();
+       }
+
+       @AfterClass
+       public static void shutdownClass() throws ExecutionException, 
InterruptedException {
+               rpc.stopService().get();
+       }
+
+       @Test
+       public void testDeployedExecutionReporting() throws Exception {
+               final OneShotLatch slotOfferLatch = new OneShotLatch();
+               final BlockingQueue<Set<ExecutionAttemptID>> 
deployedExecutionsFuture = new ArrayBlockingQueue<>(3);

Review comment:
       ```suggestion
                final BlockingQueue<Set<ExecutionAttemptID>> 
deployedExecutionsQueue = new ArrayBlockingQueue<>(3);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to