This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 45c661e02de25e315a81473586782519c22942fc Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Thu Nov 18 21:25:43 2021 +0100 [FLINK-25817] Test that the TaskExecutor can recover persisted allocation ids --- .../runtime/taskexecutor/TaskExecutorBuilder.java | 176 ++++++++++++++++++ .../taskexecutor/TaskExecutorRecoveryTest.java | 206 +++++++++++++++++++++ .../testutils/WorkingDirectoryExtension.java | 8 +- 3 files changed, 387 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java new file mode 100644 index 0000000..7fc65bc --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.resources.CPUResource; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.runtime.blob.BlobCacheService; +import org.apache.flink.runtime.blob.NoOpTaskExecutorBlobService; +import org.apache.flink.runtime.blob.TaskExecutorBlobService; +import org.apache.flink.runtime.blob.VoidPermanentBlobService; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.WorkingDirectory; +import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker; +import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.concurrent.Executors; + +import javax.annotation.Nullable; + +import java.util.Collections; + +/** Builder for testing {@link TaskExecutor}. */ +public class TaskExecutorBuilder { + + private final RpcService rpcService; + + private final HighAvailabilityServices haServices; + + private ResourceID resourceId = ResourceID.generate(); + + private Configuration configuration = new Configuration(); + + @Nullable private TaskManagerConfiguration taskManagerConfiguration = null; + + @Nullable private TaskManagerServices taskManagerServices = null; + + private ExternalResourceInfoProvider externalResourceInfoProvider = + ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES; + + private HeartbeatServices heartbeatServices = new HeartbeatServices(1 << 20, 1 << 20); + + private TaskManagerMetricGroup taskManagerMetricGroup = + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(); + + @Nullable private String metricQueryServiceAddress = null; + + @Nullable private BlobCacheService taskExecutorBlobService = null; + + private FatalErrorHandler fatalErrorHandler = NoOpFatalErrorHandler.INSTANCE; + + private TaskExecutorPartitionTracker partitionTracker = + new TestingTaskExecutorPartitionTracker(); + + private TaskExecutorResourceSpec taskExecutorResourceSpec = + new TaskExecutorResourceSpec( + new CPUResource(1.0), + MemorySize.ofMebiBytes(1024), + MemorySize.ofMebiBytes(1024), + MemorySize.ofMebiBytes(1024), + MemorySize.ofMebiBytes(1024), + Collections.emptyList()); + + private final WorkingDirectory workingDirectory; + + private TaskExecutorBuilder( + RpcService rpcService, + HighAvailabilityServices haServices, + WorkingDirectory workingDirectory) { + this.rpcService = rpcService; + this.haServices = haServices; + this.workingDirectory = workingDirectory; + } + + public TaskExecutorBuilder setConfiguration(Configuration configuration) { + this.configuration = configuration; + return this; + } + + public TaskExecutorBuilder setResourceId(ResourceID resourceId) { + this.resourceId = resourceId; + return this; + } + + public TaskExecutor build() throws Exception { + + final TaskExecutorBlobService resolvedTaskExecutorBlobService; + + TaskExecutorResourceUtils.adjustForLocalExecution(configuration); + + if (taskExecutorBlobService == null) { + resolvedTaskExecutorBlobService = NoOpTaskExecutorBlobService.INSTANCE; + } else { + resolvedTaskExecutorBlobService = taskExecutorBlobService; + } + + final TaskManagerConfiguration resolvedTaskManagerConfiguration; + + if (taskManagerConfiguration == null) { + resolvedTaskManagerConfiguration = + TaskManagerConfiguration.fromConfiguration( + configuration, + taskExecutorResourceSpec, + rpcService.getAddress(), + workingDirectory.getTmpDirectory()); + } else { + resolvedTaskManagerConfiguration = taskManagerConfiguration; + } + + final TaskManagerServices resolvedTaskManagerServices; + + if (taskManagerServices == null) { + final TaskManagerServicesConfiguration taskManagerServicesConfiguration = + TaskManagerServicesConfiguration.fromConfiguration( + configuration, + resourceId, + rpcService.getAddress(), + true, + taskExecutorResourceSpec, + workingDirectory); + resolvedTaskManagerServices = + TaskManagerServices.fromConfiguration( + taskManagerServicesConfiguration, + VoidPermanentBlobService.INSTANCE, + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), + Executors.newDirectExecutorService(), + throwable -> {}, + workingDirectory); + } else { + resolvedTaskManagerServices = taskManagerServices; + } + + return new TaskExecutor( + rpcService, + resolvedTaskManagerConfiguration, + haServices, + resolvedTaskManagerServices, + externalResourceInfoProvider, + heartbeatServices, + taskManagerMetricGroup, + metricQueryServiceAddress, + resolvedTaskExecutorBlobService, + fatalErrorHandler, + partitionTracker); + } + + public static TaskExecutorBuilder newBuilder( + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + WorkingDirectory workingDirectory) { + return new TaskExecutorBuilder(rpcService, highAvailabilityServices, workingDirectory); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java new file mode 100644 index 0000000..8683182 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.EachCallbackWrapper; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.entrypoint.WorkingDirectory; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; +import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.rpc.TestingRpcServiceExtension; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +/** Recovery tests for {@link TaskExecutor}. */ +@ExtendWith(TestLoggerExtension.class) +class TaskExecutorRecoveryTest { + private final TestingRpcServiceExtension rpcServiceExtension = new TestingRpcServiceExtension(); + + @RegisterExtension + private final EachCallbackWrapper<TestingRpcServiceExtension> eachWrapper = + new EachCallbackWrapper<>(rpcServiceExtension); + + @Test + public void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir) + throws Exception { + final ResourceID resourceId = ResourceID.generate(); + + final Configuration configuration = new Configuration(); + configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, 2); + configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true); + + final TestingResourceManagerGateway testingResourceManagerGateway = + new TestingResourceManagerGateway(); + final ArrayBlockingQueue<TaskExecutorSlotReport> queue = new ArrayBlockingQueue<>(2); + testingResourceManagerGateway.setSendSlotReportFunction( + slotReportInformation -> { + queue.offer( + TaskExecutorSlotReport.create( + slotReportInformation.f0, slotReportInformation.f2)); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + final TestingRpcService rpcService = rpcServiceExtension.getTestingRpcService(); + rpcService.registerGateway( + testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); + + final JobID jobId = new JobID(); + + final TestingHighAvailabilityServices highAvailabilityServices = + new TestingHighAvailabilityServices(); + + highAvailabilityServices.setResourceManagerLeaderRetriever( + new SettableLeaderRetrievalService( + testingResourceManagerGateway.getAddress(), + testingResourceManagerGateway.getFencingToken().toUUID())); + final SettableLeaderRetrievalService jobMasterLeaderRetriever = + new SettableLeaderRetrievalService(); + highAvailabilityServices.setJobMasterLeaderRetriever(jobId, jobMasterLeaderRetriever); + + final WorkingDirectory workingDirectory = WorkingDirectory.create(tempDir); + final TaskExecutor taskExecutor = + TaskExecutorBuilder.newBuilder( + rpcService, highAvailabilityServices, workingDirectory) + .setConfiguration(configuration) + .setResourceId(resourceId) + .build(); + + taskExecutor.start(); + + final TaskExecutorGateway taskExecutorGateway = + taskExecutor.getSelfGateway(TaskExecutorGateway.class); + + final TaskExecutorSlotReport taskExecutorSlotReport = queue.take(); + + final SlotReport slotReport = taskExecutorSlotReport.getSlotReport(); + + assertThat(slotReport.getNumSlotStatus(), is(2)); + + final SlotStatus slotStatus = slotReport.iterator().next(); + final SlotID allocatedSlotID = slotStatus.getSlotID(); + + final AllocationID allocationId = new AllocationID(); + taskExecutorGateway + .requestSlot( + allocatedSlotID, + jobId, + allocationId, + slotStatus.getResourceProfile(), + "localhost", + testingResourceManagerGateway.getFencingToken(), + Time.seconds(10L)) + .join(); + + taskExecutor.close(); + + final BlockingQueue<Collection<SlotOffer>> offeredSlots = new ArrayBlockingQueue<>(1); + + final TestingJobMasterGateway jobMasterGateway = + new TestingJobMasterGatewayBuilder() + .setOfferSlotsFunction( + (resourceID, slotOffers) -> { + offeredSlots.offer(new HashSet<>(slotOffers)); + return CompletableFuture.completedFuture(slotOffers); + }) + .build(); + rpcService.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); + jobMasterLeaderRetriever.notifyListener( + jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID()); + + // recover the TaskExecutor + final TaskExecutor recoveredTaskExecutor = + TaskExecutorBuilder.newBuilder( + rpcService, highAvailabilityServices, workingDirectory) + .setConfiguration(configuration) + .setResourceId(resourceId) + .build(); + + recoveredTaskExecutor.start(); + + final TaskExecutorSlotReport recoveredSlotReport = queue.take(); + + for (SlotStatus status : recoveredSlotReport.getSlotReport()) { + if (status.getSlotID().equals(allocatedSlotID)) { + assertThat(status.getJobID(), is(jobId)); + assertThat(status.getAllocationID(), is(allocationId)); + } else { + assertThat(status.getJobID(), is(nullValue())); + } + } + + final Collection<SlotOffer> take = offeredSlots.take(); + + assertThat(take, hasSize(1)); + final SlotOffer offeredSlot = take.iterator().next(); + + assertThat(offeredSlot.getAllocationId(), is(allocationId)); + } + + private static final class TaskExecutorSlotReport { + private final ResourceID taskExecutorResourceId; + private final SlotReport slotReport; + + private TaskExecutorSlotReport(ResourceID taskExecutorResourceId, SlotReport slotReport) { + this.taskExecutorResourceId = taskExecutorResourceId; + this.slotReport = slotReport; + } + + public ResourceID getTaskExecutorResourceId() { + return taskExecutorResourceId; + } + + public SlotReport getSlotReport() { + return slotReport; + } + + public static TaskExecutorSlotReport create( + ResourceID taskExecutorResourceId, SlotReport slotReport) { + return new TaskExecutorSlotReport(taskExecutorResourceId, slotReport); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/WorkingDirectoryExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/WorkingDirectoryExtension.java index df58674..cf42fac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/WorkingDirectoryExtension.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/WorkingDirectoryExtension.java @@ -21,17 +21,19 @@ package org.apache.flink.runtime.testutils; import org.apache.flink.core.testutils.CustomExtension; import org.apache.flink.runtime.entrypoint.WorkingDirectory; -import org.junit.jupiter.api.io.TempDir; - import java.io.File; import java.io.IOException; /** Extension to generate {@link WorkingDirectory}. */ public class WorkingDirectoryExtension implements CustomExtension { - @TempDir File tmpDirectory; + private final File tmpDirectory; private int counter = 0; + public WorkingDirectoryExtension(File tmpDirectory) { + this.tmpDirectory = tmpDirectory; + } + public WorkingDirectory createNewWorkingDirectory() throws IOException { return WorkingDirectory.create(new File(tmpDirectory, "working_directory_" + counter++)); }