tillrohrmann commented on a change in pull request #12815: URL: https://github.com/apache/flink/pull/12815#discussion_r449579733
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconcilerImpl.java ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport; + +import java.util.HashSet; +import java.util.Set; + +/** + * Default {@link ExecutionDeploymentReconciler} implementation. Detects missing/unknown deployments, and defers + * to a provided {@link ExecutionDeploymentReconciliationHandler} to resolve them. + */ +public class ExecutionDeploymentReconcilerImpl implements ExecutionDeploymentReconciler { Review comment: ```suggestion public class DefaultExecutionDeploymentReconciler implements ExecutionDeploymentReconciler { ``` It is not a lot more specific than `XYZImpl` but a bit since it says that this is the default implementation. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconcilerImpl.java ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport; + +import java.util.HashSet; +import java.util.Set; + +/** + * Default {@link ExecutionDeploymentReconciler} implementation. Detects missing/unknown deployments, and defers + * to a provided {@link ExecutionDeploymentReconciliationHandler} to resolve them. + */ +public class ExecutionDeploymentReconcilerImpl implements ExecutionDeploymentReconciler { + + private final ExecutionDeploymentReconciliationHandler handler; + + public ExecutionDeploymentReconcilerImpl(ExecutionDeploymentReconciliationHandler handler) { + this.handler = handler; + } + + @Override + public void reconcileExecutionDeployments(ResourceID taskExecutorHost, ExecutionDeploymentReport executionDeploymentReport, Set<ExecutionAttemptID> expectedDeployedExecutions) { + final Set<ExecutionAttemptID> executions = new HashSet<>(expectedDeployedExecutions); + + executionDeploymentReport.getExecutions().forEach(executionAttemptID -> { Review comment: I prefer the KISS principle and would simply use the for-each loop here. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ########## @@ -753,8 +753,11 @@ public void deploy() throws JobException { .thenCompose(Function.identity()) .whenCompleteAsync( (ack, failure) -> { - // only respond to the failure case - if (failure != null) { + if (failure == null) { + getExecutionGraph() + .getExecutionDeploymentListener() Review comment: This call effectively is `getVertex().getExecutionGraph().getExecutionDeploymentListener()` which is violating the law of Demeter. It effectively couples the implementation of where the `ExecutionDeploymentListener` lives. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ########## @@ -224,10 +227,34 @@ public JobMaster( ClassLoader userCodeLoader, SchedulerNGFactory schedulerNGFactory, ShuffleMaster<?> shuffleMaster, - PartitionTrackerFactory partitionTrackerFactory) throws Exception { + PartitionTrackerFactory partitionTrackerFactory, + ExecutionDeploymentTracker executionDeploymentTracker, + ExecutionDeploymentReconciler.Factory executionDeploymentReconcilerFactory) throws Exception { super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME), null); + final ExecutionDeploymentReconciliationHandler executionStateReconciliationHandler = new ExecutionDeploymentReconciliationHandler() { + @Override + public void onMissingDeployment(ExecutionAttemptID deployment) { + log.debug("Failing deployment {} due to no longer being deployed.", deployment); + schedulerNG.updateTaskExecutionState(new TaskExecutionState( + jobGraph.getJobID(), deployment, ExecutionState.FAILED, new FlinkException("State de-sync") Review comment: Maybe add a bit more specific exception message. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTracker.java ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; + +import java.util.Set; + +/** + * A tracker for deployed executions. + */ +public interface ExecutionDeploymentTracker { + + /** + * Starts tracking the given execution that was deployed on the given host. + * + * @param deployment deployment to start tracking + * @param host hosting task executor + */ + void startTrackingDeployment(ExecutionAttemptID deployment, ResourceID host); + + /** + * Stops tracking the given execution. + * + * @param deployment deployment to stop tracking + */ + void stopTrackingDeployment(ExecutionAttemptID deployment); + + /** + * Returns all tracked executions for the given host. + * + * @param host hosting task executor + * @return tracked executions + */ + Set<ExecutionAttemptID> getExecutions(ResourceID host); Review comment: ```suggestion Set<ExecutionAttemptID> getExecutionsOn(ResourceID host); ``` or maybe `getExecutionsOnHost` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTracker.java ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; + +import java.util.Set; + +/** + * A tracker for deployed executions. + */ +public interface ExecutionDeploymentTracker { + + /** + * Starts tracking the given execution that was deployed on the given host. + * + * @param deployment deployment to start tracking + * @param host hosting task executor + */ + void startTrackingDeployment(ExecutionAttemptID deployment, ResourceID host); + + /** + * Stops tracking the given execution. + * + * @param deployment deployment to stop tracking + */ + void stopTrackingDeployment(ExecutionAttemptID deployment); Review comment: ```suggestion void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptId); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToJobManagerHeartbeatPayload.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import java.util.Collections; + +/** + * Payload for heartbeats sent from the TaskExecutor to the JobManager. + */ +public class TaskExecutorToJobManagerHeartbeatPayload { Review comment: the `AkkaRpcService` requires that all messages are serializable. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.utils.JobGraphTestUtils; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.AccumulatorReport; +import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; +import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +/** + * Tests for the execution deployment-reconciliation logic in the {@link JobMaster}. + */ +public class JobMasterExecutionDeploymentReconciliationTest extends TestLogger { + + private static final Time testingTimeout = Time.seconds(10L); + + private static TestingRpcService rpcService; Review comment: You could also use `TestingRpcServiceResource` here. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ########## @@ -753,8 +753,11 @@ public void deploy() throws JobException { .thenCompose(Function.identity()) .whenCompleteAsync( (ack, failure) -> { - // only respond to the failure case - if (failure != null) { + if (failure == null) { + getExecutionGraph() + .getExecutionDeploymentListener() + .onCompletedDeployment(attemptId, getAssignedResourceLocation().getResourceID()); Review comment: What happens if this `Execution` is no longer valid (e.g. it has failed in the meantime?). Shouldn't we guard against this situation similar to what we do in `ExecutionVertex.notifyStateTransition` by checking what the `currentExecution` is? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ########## @@ -1601,6 +1604,8 @@ private boolean transitionState(ExecutionState currentState, ExecutionState targ } } + getExecutionGraph().getExecutionStateUpdateListener().onStateUpdate(attemptId, state); Review comment: I think we can move this to `ExecutionGraph.notifyExecutionChange`. That way we will only react to signals coming from a valid `Execution`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconciler.java ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport; + +import java.util.Set; + +/** + * Component for reconciling the deployment state of executions. + */ +public interface ExecutionDeploymentReconciler { + + /** + * Factory for {@link ExecutionDeploymentReconciler}. + */ + interface Factory { + ExecutionDeploymentReconciler get(ExecutionDeploymentReconciliationHandler reconciliationHandler); Review comment: Maybe ```suggestion ExecutionDeploymentReconciler create(ExecutionDeploymentReconciliationHandler reconciliationHandler); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconciler.java ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport; + +import java.util.Set; + +/** + * Component for reconciling the deployment state of executions. + */ +public interface ExecutionDeploymentReconciler { + + /** + * Factory for {@link ExecutionDeploymentReconciler}. + */ + interface Factory { + ExecutionDeploymentReconciler get(ExecutionDeploymentReconciliationHandler reconciliationHandler); + } + + /** + * Reconciles the deployment states between all reported/expected executions for the given task executor. + * + * @param taskExecutorHost Review comment: Description is missing ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconciliationHandler.java ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; + +/** + * Interface for triggering actions in case of state mismatches. + */ +public interface ExecutionDeploymentReconciliationHandler { + /** + * Called if an execution is expected to be hosted on a task executor, but isn't. + */ + void onMissingDeployment(ExecutionAttemptID deployment); Review comment: ```suggestion void onMissingDeployment(ExecutionAttemptID executionAttemptId); ``` not sure whether the parameter should really be named `deployment`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTracker.java ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; + +import java.util.Set; + +/** + * A tracker for deployed executions. + */ +public interface ExecutionDeploymentTracker { + + /** + * Starts tracking the given execution that was deployed on the given host. + * + * @param deployment deployment to start tracking + * @param host hosting task executor + */ + void startTrackingDeployment(ExecutionAttemptID deployment, ResourceID host); Review comment: ```suggestion void startTrackingDeploymentOf(ExecutionAttemptID executionAttemptId, ResourceID host); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconciliationHandler.java ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; + +/** + * Interface for triggering actions in case of state mismatches. + */ +public interface ExecutionDeploymentReconciliationHandler { + /** + * Called if an execution is expected to be hosted on a task executor, but isn't. + */ Review comment: The JavaDocs are not complete. For interfaces I think we should provide complete JavaDocs. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTrackerImpl.java ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Default {@link ExecutionDeploymentTracker} implementation. + */ +public class ExecutionDeploymentTrackerImpl implements ExecutionDeploymentTracker, ExecutionDeploymentListener { Review comment: ```suggestion public class DefaultExecutionDeploymentTracker implements ExecutionDeploymentTracker, ExecutionDeploymentListener { ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTrackerImpl.java ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Default {@link ExecutionDeploymentTracker} implementation. + */ +public class ExecutionDeploymentTrackerImpl implements ExecutionDeploymentTracker, ExecutionDeploymentListener { + + private final Map<ResourceID, Set<ExecutionAttemptID>> executionsByHost = new HashMap<>(); + private final Map<ExecutionAttemptID, ResourceID> hostByExecution = new HashMap<>(); + + @Override + public void startTrackingDeployment(ExecutionAttemptID execution, ResourceID host) { + hostByExecution.put(execution, host); + executionsByHost.compute(host, (resourceID, executionAttemptIds) -> { + if (executionAttemptIds == null) { + executionAttemptIds = new HashSet<>(); + } + executionAttemptIds.add(execution); + return executionAttemptIds; + }); Review comment: ```suggestion executionsByHost.computeIfAbsent(host, ignored -> new HashSet<>()).add(execution); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconcilerImpl.java ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport; + +import java.util.HashSet; +import java.util.Set; + +/** + * Default {@link ExecutionDeploymentReconciler} implementation. Detects missing/unknown deployments, and defers + * to a provided {@link ExecutionDeploymentReconciliationHandler} to resolve them. + */ +public class ExecutionDeploymentReconcilerImpl implements ExecutionDeploymentReconciler { + + private final ExecutionDeploymentReconciliationHandler handler; + + public ExecutionDeploymentReconcilerImpl(ExecutionDeploymentReconciliationHandler handler) { + this.handler = handler; + } + + @Override + public void reconcileExecutionDeployments(ResourceID taskExecutorHost, ExecutionDeploymentReport executionDeploymentReport, Set<ExecutionAttemptID> expectedDeployedExecutions) { + final Set<ExecutionAttemptID> executions = new HashSet<>(expectedDeployedExecutions); + + executionDeploymentReport.getExecutions().forEach(executionAttemptID -> { + boolean isTracked = executions.remove(executionAttemptID); + if (!isTracked) { + handler.onUnknownDeployment(executionAttemptID, taskExecutorHost); Review comment: Does it make sense to batch the executions which are unknown? That way we would not have to make a lookup for every unknown `Execution`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconcilerImpl.java ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport; + +import java.util.HashSet; +import java.util.Set; + +/** + * Default {@link ExecutionDeploymentReconciler} implementation. Detects missing/unknown deployments, and defers + * to a provided {@link ExecutionDeploymentReconciliationHandler} to resolve them. + */ +public class ExecutionDeploymentReconcilerImpl implements ExecutionDeploymentReconciler { + + private final ExecutionDeploymentReconciliationHandler handler; + + public ExecutionDeploymentReconcilerImpl(ExecutionDeploymentReconciliationHandler handler) { + this.handler = handler; + } + + @Override + public void reconcileExecutionDeployments(ResourceID taskExecutorHost, ExecutionDeploymentReport executionDeploymentReport, Set<ExecutionAttemptID> expectedDeployedExecutions) { + final Set<ExecutionAttemptID> executions = new HashSet<>(expectedDeployedExecutions); + + executionDeploymentReport.getExecutions().forEach(executionAttemptID -> { + boolean isTracked = executions.remove(executionAttemptID); + if (!isTracked) { + handler.onUnknownDeployment(executionAttemptID, taskExecutorHost); + } + }); + executions.forEach(handler::onMissingDeployment); Review comment: Same here. If `handler.onMissingDeployments` takes a collection of `Execution` we only would have to call it once. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTrackerImplTest.java ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.core.IsCollectionContaining.hasItems; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link ExecutionDeploymentTrackerImpl}. + */ +public class ExecutionDeploymentTrackerImplTest extends TestLogger { + + @Test + public void testStartTracking() { + final ExecutionDeploymentTrackerImpl tracker = new ExecutionDeploymentTrackerImpl(); + + final ExecutionAttemptID attemptId1 = new ExecutionAttemptID(); + final ResourceID resourceId1 = ResourceID.generate(); + tracker.startTrackingDeployment(attemptId1, resourceId1); + + assertThat(tracker.getExecutions(resourceId1), hasItems(attemptId1)); + } + + @Test + public void testOnCompleteEquivalentToStartTracking() { + final ExecutionDeploymentTrackerImpl tracker = new ExecutionDeploymentTrackerImpl(); + + final ExecutionAttemptID attemptId1 = new ExecutionAttemptID(); + final ResourceID resourceId1 = ResourceID.generate(); + tracker.onCompletedDeployment(attemptId1, resourceId1); + + assertThat(tracker.getExecutions(resourceId1), hasItems(attemptId1)); + } + + @Test + public void testStopTracking() { + final ExecutionDeploymentTrackerImpl tracker = new ExecutionDeploymentTrackerImpl(); + + final ExecutionAttemptID attemptId1 = new ExecutionAttemptID(); + final ResourceID resourceId1 = ResourceID.generate(); + tracker.startTrackingDeployment(attemptId1, resourceId1); + + tracker.stopTrackingDeployment(attemptId1); + + assertThat(tracker.getExecutions(resourceId1), empty()); + } + + @Test + public void testStopTrackingDoesNotAffectOtherIds() { + final ExecutionDeploymentTrackerImpl tracker = new ExecutionDeploymentTrackerImpl(); + + final ExecutionAttemptID attemptId1 = new ExecutionAttemptID(); + final ResourceID resourceId1 = ResourceID.generate(); + tracker.startTrackingDeployment(attemptId1, resourceId1); + + tracker.stopTrackingDeployment(new ExecutionAttemptID()); + + assertThat(tracker.getExecutions(resourceId1), hasItems(attemptId1)); + } + + @Test + public void testStopTrackingUnknownExecutionDoesNotThrowException() { + final ExecutionDeploymentTrackerImpl tracker = new ExecutionDeploymentTrackerImpl(); + + final ExecutionAttemptID attemptId2 = new ExecutionAttemptID(); + tracker.stopTrackingDeployment(attemptId2); + } + + @Test + public void testGetExecutionsReturnsEmptySetForUnknownHost() { + final ExecutionDeploymentTrackerImpl tracker = new ExecutionDeploymentTrackerImpl(); + + assertThat(tracker.getExecutions(ResourceID.generate()), notNullValue()); Review comment: Should we also assert that the returned `Collection` is empty? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTrackerImpl.java ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Default {@link ExecutionDeploymentTracker} implementation. + */ +public class ExecutionDeploymentTrackerImpl implements ExecutionDeploymentTracker, ExecutionDeploymentListener { Review comment: Why does this class implement `ExecutionDeploymentListener`? I think it is nowhere used. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconcilerImplTest.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.concurrent.CompletableFuture; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link ExecutionDeploymentReconcilerImpl}. + */ +public class ExecutionDeploymentReconcilerImplTest extends TestLogger { + + @Test + public void testMatchingDeployments() throws Exception { + runTest((reconciler, missingFuture, unknownFuture) -> { + ResourceID resourceId = ResourceID.generate(); + ExecutionAttemptID attemptId = new ExecutionAttemptID(); + + reconciler.reconcileExecutionDeployments( + resourceId, + new ExecutionDeploymentReport(Collections.singleton(attemptId)), + Collections.singleton(attemptId)); + + assertFalse(missingFuture.isDone()); + assertFalse(unknownFuture.isDone()); + }); + } Review comment: It's mainly a personal preference but I think tests are easier to understand if one know what's going on. With `runTest` a lot of details are being hidden. For example, the below suggestion makes it a bit easier to understand what `missingFuture` and `unknownFuture` actually are. ```suggestion @Test public void test() { final TestingExecutionDeploymentReconciliationHandler handler = new TestingExecutionDeploymentReconciliationHandler(); final ExecutionDeploymentReconciler reconciler = new ExecutionDeploymentReconcilerImpl(handler); ResourceID resourceId = ResourceID.generate(); ExecutionAttemptID attemptId = new ExecutionAttemptID(); reconciler.reconcileExecutionDeployments( resourceId, new ExecutionDeploymentReport(Collections.singleton(attemptId)), Collections.singleton(attemptId)); assertThat(handler.getMissingDeployments(), is(empty())); assertFalse(handler.getUnknownDeployments(), is(empty())); } ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.utils.JobGraphTestUtils; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.AccumulatorReport; +import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; +import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +/** + * Tests for the execution deployment-reconciliation logic in the {@link JobMaster}. + */ +public class JobMasterExecutionDeploymentReconciliationTest extends TestLogger { + + private static final Time testingTimeout = Time.seconds(10L); + + private static TestingRpcService rpcService; + + private final HeartbeatServices heartbeatServices = new HeartbeatServices(Integer.MAX_VALUE, Integer.MAX_VALUE); + + private final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + private final SettableLeaderRetrievalService resourceManagerLeaderRetriever = new SettableLeaderRetrievalService(); + private final TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); + + @Rule + public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource(); + + @Before + public void setup() { + haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever); + haServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService); + haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); + } + + @After + public void shutdown() { + rpcService.clearGateways(); + } + + @BeforeClass + public static void setupClass() { + rpcService = new TestingRpcService(); + } + + @AfterClass + public static void shutdownClass() throws ExecutionException, InterruptedException { + rpcService.stopService().get(); + } + + @Test + public void testExecutionDeploymentReconciliation() throws Exception { + JobMasterBuilder.TestingOnCompletionActions onCompletionActions = new JobMasterBuilder.TestingOnCompletionActions(); + JobMaster jobMaster = createAndStartJobMaster(onCompletionActions); + JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + rpcService.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); + + final JobMasterPartitionReleaseTest.AllocationIdsResourceManagerGateway resourceManagerGateway = createResourceManagerGateway(); + + final CompletableFuture<Void> taskSubmissionFuture = new CompletableFuture<>(); + final CompletableFuture<ExecutionAttemptID> taskCancellationFuture = new CompletableFuture<>(); + TaskExecutorGateway taskExecutorGateway = createTaskExecutorGateway(taskSubmissionFuture, taskCancellationFuture); + LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation(); + + registerTaskExecutorAndOfferSlots(resourceManagerGateway, jobMasterGateway, taskExecutorGateway, localUnresolvedTaskManagerLocation); + + taskSubmissionFuture.get(); Review comment: I think this test is unstable because the completion of this future does not guarantee that we are tracking the respective task. If you add a `Thread.sleep()` in line 161, then you can reproduce it. The reason is that we use a `whenCompleteAsync` when deploying an `Execution`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java ########## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.BlockerSync; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobCacheService; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.partition.PartitionTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo; +import org.apache.flink.runtime.jobmaster.AllocatedSlotReport; +import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; +import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils; +import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.StreamSupport; + +import static org.hamcrest.collection.IsCollectionWithSize.hasSize; +import static org.hamcrest.core.IsCollectionContaining.hasItem; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the execution deployment-reconciliation logic in the {@link TaskExecutor}. + */ +public class TaskExecutorExecutionDeploymentReconciliationTest extends TestLogger { + + private static final Time timeout = Time.seconds(10L); + + private static TestingRpcService rpc; + + private final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + private final SettableLeaderRetrievalService jobManagerLeaderRetriever = new SettableLeaderRetrievalService(); + private final SettableLeaderRetrievalService resourceManagerLeaderRetriever = new SettableLeaderRetrievalService(); + private final JobID jobId = new JobID(); + + @Rule + public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource(); + + @Before + public void setup() { + haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever); + haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetriever); + } + + @After + public void shutdown() { + rpc.clearGateways(); + } + + @BeforeClass + public static void setupClass() { + rpc = new TestingRpcService(); + } + + @AfterClass + public static void shutdownClass() throws ExecutionException, InterruptedException { + rpc.stopService().get(); + } + + @Test + public void testDeployedExecutionReporting() throws Exception { + final OneShotLatch slotOfferLatch = new OneShotLatch(); + final BlockingQueue<Set<ExecutionAttemptID>> deployedExecutionsFuture = new ArrayBlockingQueue<>(3); + final CompletableFuture<Void> taskFinishedFuture = new CompletableFuture<>(); + final ResourceID jobManagerResourceId = ResourceID.generate(); + final TestingJobMasterGateway jobMasterGateway = setupJobManagerGateway(slotOfferLatch, deployedExecutionsFuture, taskFinishedFuture, jobManagerResourceId); + + final CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<>(); + final TestingResourceManagerGateway testingResourceManagerGateway = setupResourceManagerGateway(initialSlotReportFuture); + + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskSlotTable(TaskSlotUtils.createTaskSlotTable(1, timeout)) + .setShuffleEnvironment(new NettyShuffleEnvironmentBuilder().build()) + .build(); + + final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices); + + try { + taskExecutor.start(); + taskExecutor.waitUntilStarted(); + + final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class); + + final TaskDeploymentDescriptor taskDeploymentDescriptor = createTaskDeploymentDescriptor(jobId); + + connectComponentsAndRequestSlot(jobMasterGateway, testingResourceManagerGateway, taskExecutorGateway, taskManagerServices.getJobLeaderService(), initialSlotReportFuture, taskDeploymentDescriptor.getAllocationId()); + + TestingInvokable.sync = new BlockerSync(); + + // This ensures TM has been successfully registered to JM. + slotOfferLatch.await(); + + AllocatedSlotReport slotAllocationReport = new AllocatedSlotReport(jobId, Collections.singleton(new AllocatedSlotInfo(0, taskDeploymentDescriptor.getAllocationId()))); + + // nothing as deployed, so the deployment report should be empty + taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, slotAllocationReport); + assertThat(deployedExecutionsFuture.take(), hasSize(0)); + + taskExecutorGateway.submitTask(taskDeploymentDescriptor, jobMasterGateway.getFencingToken(), timeout) + .get(); + + TestingInvokable.sync.awaitBlocker(); + + // task is deployed, so the deployment report should contain it + taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, slotAllocationReport); + assertThat(deployedExecutionsFuture.take(), hasItem(taskDeploymentDescriptor.getExecutionAttemptId())); + + TestingInvokable.sync.releaseBlocker(); + + // task is finished ans was cleaned up, so the deployment report should be empty + taskFinishedFuture.get(); + taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, slotAllocationReport); + assertThat(deployedExecutionsFuture.take(), hasSize(0)); + } finally { + RpcUtils.terminateRpcEndpoint(taskExecutor, timeout); + } + } + + /** + * Test invokable which completes the given future when executed. + */ + public static class TestingInvokable extends AbstractInvokable { + + static BlockerSync sync; + + public TestingInvokable(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + sync.block(); + } + } + + private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices) throws IOException { + final Configuration configuration = new Configuration(); + return new TestingTaskExecutor( + rpc, + TaskManagerConfiguration.fromConfiguration( + configuration, + TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(configuration), + InetAddress.getLoopbackAddress().getHostAddress()), + haServices, + taskManagerServices, + ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, + new HeartbeatServices(1_000L, 30_000L), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), + null, + new BlobCacheService( + configuration, + new VoidBlobStore(), + null), + testingFatalErrorHandlerResource.getFatalErrorHandler(), + new TestingTaskExecutorPartitionTracker(), + TaskManagerRunner.createBackPressureSampleService(configuration, rpc.getScheduledExecutor())); + } + + private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobID jobId) throws IOException { + final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor = + PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING); + + return TaskExecutorSubmissionTest.createTaskDeploymentDescriptor( + jobId, + "job", + taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId(), + new SerializedValue<>(new ExecutionConfig()), + "Sender", + 1, + 0, + 1, + 0, + new Configuration(), + new Configuration(), + TestingInvokable.class.getName(), + Collections.singletonList(taskResultPartitionDescriptor), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0); Review comment: Can we use `TaskDeploymentDescriptorBuilder` here? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.utils.JobGraphTestUtils; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.AccumulatorReport; +import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; +import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +/** + * Tests for the execution deployment-reconciliation logic in the {@link JobMaster}. + */ +public class JobMasterExecutionDeploymentReconciliationTest extends TestLogger { + + private static final Time testingTimeout = Time.seconds(10L); + + private static TestingRpcService rpcService; + + private final HeartbeatServices heartbeatServices = new HeartbeatServices(Integer.MAX_VALUE, Integer.MAX_VALUE); + + private final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + private final SettableLeaderRetrievalService resourceManagerLeaderRetriever = new SettableLeaderRetrievalService(); + private final TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); + + @Rule + public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource(); + + @Before + public void setup() { + haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever); + haServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService); + haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); + } + + @After + public void shutdown() { + rpcService.clearGateways(); + } + + @BeforeClass + public static void setupClass() { + rpcService = new TestingRpcService(); + } + + @AfterClass + public static void shutdownClass() throws ExecutionException, InterruptedException { + rpcService.stopService().get(); + } + + @Test + public void testExecutionDeploymentReconciliation() throws Exception { + JobMasterBuilder.TestingOnCompletionActions onCompletionActions = new JobMasterBuilder.TestingOnCompletionActions(); + JobMaster jobMaster = createAndStartJobMaster(onCompletionActions); + JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + rpcService.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); + + final JobMasterPartitionReleaseTest.AllocationIdsResourceManagerGateway resourceManagerGateway = createResourceManagerGateway(); Review comment: One could make `AllocationIdsResourceManagerGateway` a top level class. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ExecutionDeploymentReport.java ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; + +import java.util.Set; + +/** + * A report about the currently deployed executions of a TaskExecutor. + */ +public class ExecutionDeploymentReport { Review comment: ```suggestion public class ExecutionDeploymentReport implements Serializable { ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToJobManagerHeartbeatPayload.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import java.util.Collections; + +/** + * Payload for heartbeats sent from the TaskExecutor to the JobManager. + */ +public class TaskExecutorToJobManagerHeartbeatPayload { Review comment: ```suggestion public class TaskExecutorToJobManagerHeartbeatPayload implements Serializable { private static final long serialVersionUID = .... ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java ########## @@ -162,10 +162,10 @@ private void testPartitionReleaseOrPromotionOnJobTermination(Function<TestSetup, } } - private static final class AllocationIdsResourceManagerGateway extends TestingResourceManagerGateway { + public static final class AllocationIdsResourceManagerGateway extends TestingResourceManagerGateway { Review comment: Maybe make this a top level class. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java ########## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.BlockerSync; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobCacheService; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.partition.PartitionTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo; +import org.apache.flink.runtime.jobmaster.AllocatedSlotReport; +import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; +import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils; +import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.StreamSupport; + +import static org.hamcrest.collection.IsCollectionWithSize.hasSize; +import static org.hamcrest.core.IsCollectionContaining.hasItem; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the execution deployment-reconciliation logic in the {@link TaskExecutor}. + */ +public class TaskExecutorExecutionDeploymentReconciliationTest extends TestLogger { + + private static final Time timeout = Time.seconds(10L); + + private static TestingRpcService rpc; Review comment: Maybe use `TestingRpcServiceResource`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java ########## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.BlockerSync; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobCacheService; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.partition.PartitionTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo; +import org.apache.flink.runtime.jobmaster.AllocatedSlotReport; +import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; +import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils; +import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.StreamSupport; + +import static org.hamcrest.collection.IsCollectionWithSize.hasSize; +import static org.hamcrest.core.IsCollectionContaining.hasItem; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the execution deployment-reconciliation logic in the {@link TaskExecutor}. + */ +public class TaskExecutorExecutionDeploymentReconciliationTest extends TestLogger { + + private static final Time timeout = Time.seconds(10L); + + private static TestingRpcService rpc; + + private final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + private final SettableLeaderRetrievalService jobManagerLeaderRetriever = new SettableLeaderRetrievalService(); + private final SettableLeaderRetrievalService resourceManagerLeaderRetriever = new SettableLeaderRetrievalService(); + private final JobID jobId = new JobID(); + + @Rule + public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource(); + + @Before + public void setup() { + haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever); + haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetriever); + } + + @After + public void shutdown() { + rpc.clearGateways(); + } + + @BeforeClass + public static void setupClass() { + rpc = new TestingRpcService(); + } + + @AfterClass + public static void shutdownClass() throws ExecutionException, InterruptedException { + rpc.stopService().get(); + } + + @Test + public void testDeployedExecutionReporting() throws Exception { + final OneShotLatch slotOfferLatch = new OneShotLatch(); + final BlockingQueue<Set<ExecutionAttemptID>> deployedExecutionsFuture = new ArrayBlockingQueue<>(3); + final CompletableFuture<Void> taskFinishedFuture = new CompletableFuture<>(); + final ResourceID jobManagerResourceId = ResourceID.generate(); + final TestingJobMasterGateway jobMasterGateway = setupJobManagerGateway(slotOfferLatch, deployedExecutionsFuture, taskFinishedFuture, jobManagerResourceId); + + final CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<>(); + final TestingResourceManagerGateway testingResourceManagerGateway = setupResourceManagerGateway(initialSlotReportFuture); + + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskSlotTable(TaskSlotUtils.createTaskSlotTable(1, timeout)) + .setShuffleEnvironment(new NettyShuffleEnvironmentBuilder().build()) + .build(); + + final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices); + + try { + taskExecutor.start(); + taskExecutor.waitUntilStarted(); + + final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class); + + final TaskDeploymentDescriptor taskDeploymentDescriptor = createTaskDeploymentDescriptor(jobId); + + connectComponentsAndRequestSlot(jobMasterGateway, testingResourceManagerGateway, taskExecutorGateway, taskManagerServices.getJobLeaderService(), initialSlotReportFuture, taskDeploymentDescriptor.getAllocationId()); + + TestingInvokable.sync = new BlockerSync(); + + // This ensures TM has been successfully registered to JM. + slotOfferLatch.await(); + + AllocatedSlotReport slotAllocationReport = new AllocatedSlotReport(jobId, Collections.singleton(new AllocatedSlotInfo(0, taskDeploymentDescriptor.getAllocationId()))); + + // nothing as deployed, so the deployment report should be empty + taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, slotAllocationReport); + assertThat(deployedExecutionsFuture.take(), hasSize(0)); + + taskExecutorGateway.submitTask(taskDeploymentDescriptor, jobMasterGateway.getFencingToken(), timeout) + .get(); + + TestingInvokable.sync.awaitBlocker(); + + // task is deployed, so the deployment report should contain it + taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, slotAllocationReport); + assertThat(deployedExecutionsFuture.take(), hasItem(taskDeploymentDescriptor.getExecutionAttemptId())); + + TestingInvokable.sync.releaseBlocker(); + + // task is finished ans was cleaned up, so the deployment report should be empty + taskFinishedFuture.get(); + taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, slotAllocationReport); + assertThat(deployedExecutionsFuture.take(), hasSize(0)); + } finally { + RpcUtils.terminateRpcEndpoint(taskExecutor, timeout); + } + } + + /** + * Test invokable which completes the given future when executed. + */ + public static class TestingInvokable extends AbstractInvokable { + + static BlockerSync sync; + + public TestingInvokable(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + sync.block(); + } + } + + private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices) throws IOException { + final Configuration configuration = new Configuration(); + return new TestingTaskExecutor( + rpc, + TaskManagerConfiguration.fromConfiguration( + configuration, + TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(configuration), + InetAddress.getLoopbackAddress().getHostAddress()), + haServices, + taskManagerServices, + ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, + new HeartbeatServices(1_000L, 30_000L), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), + null, + new BlobCacheService( + configuration, + new VoidBlobStore(), + null), + testingFatalErrorHandlerResource.getFatalErrorHandler(), + new TestingTaskExecutorPartitionTracker(), + TaskManagerRunner.createBackPressureSampleService(configuration, rpc.getScheduledExecutor())); + } + + private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobID jobId) throws IOException { + final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor = + PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING); + + return TaskExecutorSubmissionTest.createTaskDeploymentDescriptor( + jobId, + "job", + taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId(), + new SerializedValue<>(new ExecutionConfig()), + "Sender", + 1, + 0, + 1, + 0, + new Configuration(), + new Configuration(), + TestingInvokable.class.getName(), + Collections.singletonList(taskResultPartitionDescriptor), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0); + } + + private static TestingJobMasterGateway setupJobManagerGateway(OneShotLatch slotOfferLatch, BlockingQueue<Set<ExecutionAttemptID>> deployedExecutionsFuture, CompletableFuture<Void> taskFinishedFuture, ResourceID jobManagerResourceId) { + return new TestingJobMasterGatewayBuilder() + .setRegisterTaskManagerFunction((s, location) -> CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jobManagerResourceId))) + .setOfferSlotsFunction((resourceID, slotOffers) -> { + slotOfferLatch.trigger(); + return CompletableFuture.completedFuture(slotOffers); + }) + .setTaskManagerHeartbeatConsumer((resourceID, taskExecutorToJobManagerHeartbeatPayload) -> { + ExecutionDeploymentReport executionDeploymentReport = taskExecutorToJobManagerHeartbeatPayload.getExecutionDeploymentReport(); + deployedExecutionsFuture.add(executionDeploymentReport.getExecutions()); + }) + .setUpdateTaskExecutionStateFunction(taskExecutionState -> { + if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) { + taskFinishedFuture.complete(null); + } + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .build(); + } + + private static TestingResourceManagerGateway setupResourceManagerGateway(CompletableFuture<SlotReport> initialSlotReportFuture) { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + testingResourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> { + initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f2); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + testingResourceManagerGateway.setRegisterTaskExecutorFunction(input -> CompletableFuture.completedFuture( + new TaskExecutorRegistrationSuccess( + new InstanceID(), + testingResourceManagerGateway.getOwnResourceId(), + new ClusterInformation("blobServerHost", 55555)))); + return testingResourceManagerGateway; + } + + private void connectComponentsAndRequestSlot( + JobMasterGateway jobMasterGateway, + ResourceManagerGateway resourceManagerGateway, + TaskExecutorGateway taskExecutorGateway, + JobLeaderService jobLeaderService, + CompletableFuture<SlotReport> initialSlotReportFuture, + AllocationID allocationId) throws Exception { + final String jobMasterAddress = "jm"; + rpc.registerGateway(jobMasterAddress, jobMasterGateway); + rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway); + + // inform the task manager about the job leader + jobLeaderService.addJob(jobId, jobMasterAddress); + jobManagerLeaderRetriever.notifyListener(jobMasterAddress, UUID.randomUUID()); + resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID()); + + final Optional<SlotStatus> slotStatusOptional = StreamSupport.stream(initialSlotReportFuture.get().spliterator(), false) + .findAny(); + + assertTrue(slotStatusOptional.isPresent()); + + while (true) { + try { + taskExecutorGateway.requestSlot( + slotStatusOptional.get().getSlotID(), + jobId, + allocationId, + ResourceProfile.ZERO, + jobMasterAddress, + resourceManagerGateway.getFencingToken(), + timeout + ).get(); + break; + } catch (Exception e) { + // the proper establishment of the RM connection is tracked + // asynchronously, so we have to poll here until it went through + // until then, slot requests will fail with an exception + Thread.sleep(50); + } + } Review comment: If you want to wait that the `TaskExecutor` has established the connection to the `ResourceManager`, then one can wait for the `ResourceManagerGateway.sendSlotReport` call. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java ########## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.BlockerSync; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobCacheService; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.partition.PartitionTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo; +import org.apache.flink.runtime.jobmaster.AllocatedSlotReport; +import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; +import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils; +import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.StreamSupport; + +import static org.hamcrest.collection.IsCollectionWithSize.hasSize; +import static org.hamcrest.core.IsCollectionContaining.hasItem; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the execution deployment-reconciliation logic in the {@link TaskExecutor}. + */ +public class TaskExecutorExecutionDeploymentReconciliationTest extends TestLogger { + + private static final Time timeout = Time.seconds(10L); + + private static TestingRpcService rpc; + + private final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + private final SettableLeaderRetrievalService jobManagerLeaderRetriever = new SettableLeaderRetrievalService(); + private final SettableLeaderRetrievalService resourceManagerLeaderRetriever = new SettableLeaderRetrievalService(); + private final JobID jobId = new JobID(); + + @Rule + public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource(); + + @Before + public void setup() { + haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever); + haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetriever); + } + + @After + public void shutdown() { + rpc.clearGateways(); + } + + @BeforeClass + public static void setupClass() { + rpc = new TestingRpcService(); + } + + @AfterClass + public static void shutdownClass() throws ExecutionException, InterruptedException { + rpc.stopService().get(); + } + + @Test + public void testDeployedExecutionReporting() throws Exception { + final OneShotLatch slotOfferLatch = new OneShotLatch(); + final BlockingQueue<Set<ExecutionAttemptID>> deployedExecutionsFuture = new ArrayBlockingQueue<>(3); Review comment: ```suggestion final BlockingQueue<Set<ExecutionAttemptID>> deployedExecutionsQueue = new ArrayBlockingQueue<>(3); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
