zentol commented on a change in pull request #13319:
URL: https://github.com/apache/flink/pull/13319#discussion_r508445101



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
##########
@@ -144,6 +148,58 @@ public void 
testGenerateTaskManagerResourceIDWithLocalRpcService() throws Except
                assertThat(taskManagerResourceID.getResourceIdString(), 
containsString(InetAddress.getLocalHost().getHostName()));
        }
 
+       @Test
+       public void testUnexpectedTaskManagerTerminationFailsRunnerFatally() 
throws Exception {
+               final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+               final TestingTaskExecutorService taskExecutorService = 
TestingTaskExecutorService.newBuilder()
+                               .setTerminationFuture(terminationFuture)
+                               .build();
+               final TaskManagerRunner taskManagerRunner = 
createTaskManagerRunner(
+                               createConfiguration(),
+                               (configuration,
+                               resourceID,
+                               rpcService,
+                               highAvailabilityServices,
+                               heartbeatServices,
+                               metricRegistry,
+                               blobCacheService,
+                               localCommunicationOnly,
+                               externalResourceInfoProvider,
+                               fatalErrorHandler) -> taskExecutorService);

Review comment:
       maybe introduce a utility method that returns this factory; it is easy 
to miss that the list of arguments is part of the factory, and not the 
constructor.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess.java
##########
@@ -139,6 +139,8 @@ public final UUID getLeaderSessionId() {
        private void closeInternal() {
                log.info("Stopping {}.", getClass().getSimpleName());
 
+               state = State.STOPPED;

Review comment:
       Does this change belong into `cd14c5907bdbd88861f9e74448ded52d5c15a7a6`? 
(Because it requires other components to be stopped after the leader process 
transitioned to STOPPED.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentTest.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherRunner;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.core.testutils.FlinkMatchers.willNotComplete;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link DispatcherResourceManagerComponent}.
+ */
+public class DispatcherResourceManagerComponentTest extends TestLogger {
+
+       @Test
+       public void unexpectedResourceManagerTermination_failsFatally() {
+               final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+               final TestingFatalErrorHandler fatalErrorHandler = new 
TestingFatalErrorHandler();
+               final TestingResourceManagerService resourceManagerService = 
TestingResourceManagerService
+                       .newBuilder()
+                       .setTerminationFuture(terminationFuture)
+                       .build();
+
+               createDispatcherResourceManagerComponent(fatalErrorHandler, 
resourceManagerService);
+
+               final FlinkException expectedException = new 
FlinkException("Expected test exception.");
+
+               terminationFuture.completeExceptionally(expectedException);
+
+               final Throwable error = fatalErrorHandler.getException();
+               assertThat(error, containsCause(expectedException));
+       }
+
+       private DispatcherResourceManagerComponent 
createDispatcherResourceManagerComponent(
+                       TestingFatalErrorHandler fatalErrorHandler,
+                       TestingResourceManagerService resourceManagerService) {
+               return new DispatcherResourceManagerComponent(
+                       TestingDispatcherRunner.newBuilder().build(),
+                               resourceManagerService,
+                       new SettableLeaderRetrievalService(),
+                       new SettableLeaderRetrievalService(),
+                               FutureUtils::completedVoidFuture,
+                               fatalErrorHandler);

Review comment:
       odd indentation

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
##########
@@ -144,6 +148,58 @@ public void 
testGenerateTaskManagerResourceIDWithLocalRpcService() throws Except
                assertThat(taskManagerResourceID.getResourceIdString(), 
containsString(InetAddress.getLocalHost().getHostName()));
        }
 
+       @Test
+       public void testUnexpectedTaskManagerTerminationFailsRunnerFatally() 
throws Exception {
+               final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+               final TestingTaskExecutorService taskExecutorService = 
TestingTaskExecutorService.newBuilder()
+                               .setTerminationFuture(terminationFuture)
+                               .build();
+               final TaskManagerRunner taskManagerRunner = 
createTaskManagerRunner(
+                               createConfiguration(),
+                               (configuration,
+                               resourceID,
+                               rpcService,
+                               highAvailabilityServices,
+                               heartbeatServices,
+                               metricRegistry,
+                               blobCacheService,
+                               localCommunicationOnly,
+                               externalResourceInfoProvider,
+                               fatalErrorHandler) -> taskExecutorService);
+
+               terminationFuture.completeExceptionally(new 
FlinkException("Test exception."));
+
+               Integer statusCode = 
systemExitTrackingSecurityManager.getSystemExitFuture().get();
+               assertThat(statusCode, 
is(equalTo(TaskManagerRunner.RUNTIME_FAILURE_RETURN_CODE)));
+       }
+
+       @Test
+       public void 
testUnexpectedTaskManagerTerminationAfterRunnerCloseWillBeIgnored() throws 
Exception {
+               final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+               final TestingTaskExecutorService taskExecutorService = 
TestingTaskExecutorService.newBuilder()
+                               .setTerminationFuture(terminationFuture)
+                               .withManualTerminationFutureCompletion()
+                               .build();
+               final TaskManagerRunner taskManagerRunner = 
createTaskManagerRunner(
+                               createConfiguration(),
+                               (configuration,
+                               resourceID,
+                               rpcService,
+                               highAvailabilityServices,
+                               heartbeatServices,
+                               metricRegistry,
+                               blobCacheService,
+                               localCommunicationOnly,
+                               externalResourceInfoProvider,
+                               fatalErrorHandler) -> taskExecutorService);

Review comment:
       same as above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to