xintongsong commented on a change in pull request #13004: URL: https://github.com/apache/flink/pull/13004#discussion_r468507810
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java ########## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.active; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration; +import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder; +import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.RunnableWithException; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link ActiveResourceManager}. + */ +public class ActiveResourceManagerTest extends TestLogger { + + private static final long TIMEOUT_SEC = 5L; + private static final Time TIMEOUT_TIME = Time.seconds(TIMEOUT_SEC); + + private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = WorkerResourceSpec.ZERO; + + /** + * Tests worker successfully requested, started and registered. + */ + @Test + public void testStartNewWorker() throws Exception { + new Context() {{ + final ResourceID tmResourceId = ResourceID.generate(); + final CompletableFuture<TaskExecutorProcessSpec> requestWorkerFromDriverFuture = new CompletableFuture<>(); + + driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> { + requestWorkerFromDriverFuture.complete(taskExecutorProcessSpec); + return CompletableFuture.completedFuture(tmResourceId); + }); + + runTest(() -> { + // received worker request, verify requesting from driver + CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() -> + getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC)); + TaskExecutorProcessSpec taskExecutorProcessSpec = requestWorkerFromDriverFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS); + + assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true)); + assertThat(taskExecutorProcessSpec, + is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC))); + + // worker registered, verify registration succeed + CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId); + assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class)); + }); + }}; + } + + /** + * Tests worker failed while requesting. + */ + @Test + public void testStartNewWorkerFailedRequesting() throws Exception { + new Context() {{ + final ResourceID tmResourceId = ResourceID.generate(); + final AtomicInteger requestCount = new AtomicInteger(0); + + final List<CompletableFuture<ResourceID>> resourceIdFutures = new ArrayList<>(); + resourceIdFutures.add(new CompletableFuture<>()); + resourceIdFutures.add(new CompletableFuture<>()); + + final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>(); + requestWorkerFromDriverFutures.add(new CompletableFuture<>()); + requestWorkerFromDriverFutures.add(new CompletableFuture<>()); + + driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> { + int idx = requestCount.getAndIncrement(); + assertThat(idx, lessThan(2)); + + requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec); + return resourceIdFutures.get(idx); + }); + + slotManagerBuilder.setGetRequiredResourcesSupplier(() -> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1)); + + runTest(() -> { + // received worker request, verify requesting from driver + CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() -> + getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC)); + TaskExecutorProcessSpec taskExecutorProcessSpec1 = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS); + + assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true)); + assertThat(taskExecutorProcessSpec1, + is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC))); + + // first request failed, verify requesting another worker from driver + runInMainThread(() -> resourceIdFutures.get(0).completeExceptionally(new Throwable("testing error"))); + TaskExecutorProcessSpec taskExecutorProcessSpec2 = + requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS); + + assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1)); + + // second request allocated, verify registration succeed + runInMainThread(() -> resourceIdFutures.get(1).complete(tmResourceId)); + CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId); + assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class)); + }); + }}; + } + + /** + * Tests worker terminated after requested before registered. + */ + @Test + public void testWorkerTerminatedBeforeRegister() throws Exception { + new Context() {{ + final AtomicInteger requestCount = new AtomicInteger(0); + + final List<ResourceID> tmResourceIds = new ArrayList<>(); + tmResourceIds.add(ResourceID.generate()); + tmResourceIds.add(ResourceID.generate()); + + final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>(); + requestWorkerFromDriverFutures.add(new CompletableFuture<>()); + requestWorkerFromDriverFutures.add(new CompletableFuture<>()); + + driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> { + int idx = requestCount.getAndIncrement(); + assertThat(idx, lessThan(2)); + + requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec); + return CompletableFuture.completedFuture(tmResourceIds.get(idx)); + }); + + slotManagerBuilder.setGetRequiredResourcesSupplier(() -> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1)); + + runTest(() -> { + // received worker request, verify requesting from driver + CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() -> + getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC)); + TaskExecutorProcessSpec taskExecutorProcessSpec1 = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS); + + assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true)); + assertThat(taskExecutorProcessSpec1, + is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC))); + + // first worker failed before register, verify requesting another worker from driver + runInMainThread(() -> getResourceManager().onWorkerTerminated(tmResourceIds.get(0))); + TaskExecutorProcessSpec taskExecutorProcessSpec2 = + requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS); + + assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1)); + + // second worker registered, verify registration succeed + CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceIds.get(1)); + assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class)); + }); + }}; + } + + /** + * Tests worker terminated after registered. + */ + @Test + public void testWorkerTerminatedAfterRegister() throws Exception { + new Context() {{ + final AtomicInteger requestCount = new AtomicInteger(0); + + final List<ResourceID> tmResourceIds = new ArrayList<>(); + tmResourceIds.add(ResourceID.generate()); + tmResourceIds.add(ResourceID.generate()); + + final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>(); + requestWorkerFromDriverFutures.add(new CompletableFuture<>()); + requestWorkerFromDriverFutures.add(new CompletableFuture<>()); + + driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> { + int idx = requestCount.getAndIncrement(); + assertThat(idx, lessThan(2)); + + requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec); + return CompletableFuture.completedFuture(tmResourceIds.get(idx)); + }); + + slotManagerBuilder.setGetRequiredResourcesSupplier(() -> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1)); + + runTest(() -> { + // received worker request, verify requesting from driver + CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() -> + getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC)); + TaskExecutorProcessSpec taskExecutorProcessSpec1 = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS); + + assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true)); + assertThat(taskExecutorProcessSpec1, + is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC))); + + // first worker registered, verify registration succeed + CompletableFuture<RegistrationResponse> registerTaskExecutorFuture1 = registerTaskExecutor(tmResourceIds.get(0)); + assertThat(registerTaskExecutorFuture1.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class)); + + // first worker terminated, verify requesting another worker from driver + runInMainThread(() -> getResourceManager().onWorkerTerminated(tmResourceIds.get(0))); + TaskExecutorProcessSpec taskExecutorProcessSpec2 = + requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS); + + assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1)); + + // second worker registered, verify registration succeed + CompletableFuture<RegistrationResponse> registerTaskExecutorFuture2 = registerTaskExecutor(tmResourceIds.get(1)); + assertThat(registerTaskExecutorFuture2.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class)); + }); + }}; + } + + /** + * Tests worker terminated and is no longer required. + */ + @Test + public void testWorkerTerminatedNoLongerRequired() throws Exception { + new Context() {{ + final ResourceID tmResourceId = ResourceID.generate(); + final AtomicInteger requestCount = new AtomicInteger(0); + + final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>(); + requestWorkerFromDriverFutures.add(new CompletableFuture<>()); + requestWorkerFromDriverFutures.add(new CompletableFuture<>()); + + driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> { + int idx = requestCount.getAndIncrement(); + assertThat(idx, lessThan(2)); + + requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec); + return CompletableFuture.completedFuture(tmResourceId); + }); + + runTest(() -> { + // received worker request, verify requesting from driver + CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() -> + getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC)); + TaskExecutorProcessSpec taskExecutorProcessSpec = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS); + + assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true)); + assertThat(taskExecutorProcessSpec, + is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC))); + + // worker registered, verify registration succeed + CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId); + assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class)); + + // worker terminated, verify not requesting new worker + runInMainThread(() -> { + getResourceManager().onWorkerTerminated(tmResourceId); + // needs to return something, so that we can use `get()` to make sure the main thread processing + // finishes before the assertions + return null; + }).get(TIMEOUT_SEC, TimeUnit.SECONDS); + assertFalse(requestWorkerFromDriverFutures.get(1).isDone()); + }); + }}; + } + + /** + * Tests workers from previous attempt successfully recovered and registered. + */ + @Test + public void testRecoverWorkerFromPreviousAttempt() throws Exception { + new Context() {{ + final ResourceID tmResourceId = ResourceID.generate(); + + runTest(() -> { + runInMainThread(() -> getResourceManager().onPreviousAttemptWorkersRecovered(Collections.singleton(tmResourceId))); + CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId); + assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class)); + }); + }}; + } + + /** + * Tests decline unknown worker registration. + */ + @Test + public void testRegisterUnknownWorker() throws Exception { + new Context() {{ + runTest(() -> { + CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(ResourceID.generate()); + assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Decline.class)); + }); + }}; + } + + @Test + public void testOnError() throws Exception { + new Context() {{ + final Throwable fatalError = new Throwable("Testing fatal error"); + runTest(() -> { + runInMainThread(() -> getResourceManager().onError(fatalError)); + final Throwable reportedError = getFatalErrorHandler().getErrorFuture().get(TIMEOUT_SEC, TimeUnit.SECONDS); + assertThat(reportedError, is(fatalError)); + }); + }}; + } + + class Context { + + final Configuration flinkConfig = new Configuration(); + final TestingResourceManagerDriver.Builder driverBuilder = new TestingResourceManagerDriver.Builder(); + final TestingSlotManagerBuilder slotManagerBuilder = new TestingSlotManagerBuilder(); + + private ActiveResourceManager<ResourceID> resourceManager; + private TestingFatalErrorHandler fatalErrorHandler; + + ActiveResourceManager<ResourceID> getResourceManager() { + return resourceManager; + } + + TestingFatalErrorHandler getFatalErrorHandler() { + return fatalErrorHandler; + } + + void runTest(RunnableWithException testMethod) throws Exception { + fatalErrorHandler = new TestingFatalErrorHandler(); + resourceManager = createAndStartResourceManager( + flinkConfig, + driverBuilder.build(), + slotManagerBuilder.createSlotManager()); + + try { + testMethod.run(); + } finally { + resourceManager.close(); + } + } + + private ActiveResourceManager<ResourceID> createAndStartResourceManager( + Configuration configuration, + ResourceManagerDriver<ResourceID> driver, + SlotManager slotManager) throws Exception { + final TestingRpcService rpcService = new TestingRpcService(configuration); Review comment: The `configuration` used for creating `TestingRpcService` might be different for test cases. It's probably not a problem, since the chance that `configuration` contains something affects the `TestingRpcService` is small. But I would still like to make it save. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
