This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 45c661e02de25e315a81473586782519c22942fc
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Thu Nov 18 21:25:43 2021 +0100

    [FLINK-25817] Test that the TaskExecutor can recover persisted allocation 
ids
---
 .../runtime/taskexecutor/TaskExecutorBuilder.java  | 176 ++++++++++++++++++
 .../taskexecutor/TaskExecutorRecoveryTest.java     | 206 +++++++++++++++++++++
 .../testutils/WorkingDirectoryExtension.java       |   8 +-
 3 files changed, 387 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
new file mode 100644
index 0000000..7fc65bc
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.NoOpTaskExecutorBlobService;
+import org.apache.flink.runtime.blob.TaskExecutorBlobService;
+import org.apache.flink.runtime.blob.VoidPermanentBlobService;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.WorkingDirectory;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
+import 
org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.concurrent.Executors;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+
+/** Builder for testing {@link TaskExecutor}. */
+public class TaskExecutorBuilder {
+
+    private final RpcService rpcService;
+
+    private final HighAvailabilityServices haServices;
+
+    private ResourceID resourceId = ResourceID.generate();
+
+    private Configuration configuration = new Configuration();
+
+    @Nullable private TaskManagerConfiguration taskManagerConfiguration = null;
+
+    @Nullable private TaskManagerServices taskManagerServices = null;
+
+    private ExternalResourceInfoProvider externalResourceInfoProvider =
+            ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES;
+
+    private HeartbeatServices heartbeatServices = new HeartbeatServices(1 << 
20, 1 << 20);
+
+    private TaskManagerMetricGroup taskManagerMetricGroup =
+            
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
+
+    @Nullable private String metricQueryServiceAddress = null;
+
+    @Nullable private BlobCacheService taskExecutorBlobService = null;
+
+    private FatalErrorHandler fatalErrorHandler = 
NoOpFatalErrorHandler.INSTANCE;
+
+    private TaskExecutorPartitionTracker partitionTracker =
+            new TestingTaskExecutorPartitionTracker();
+
+    private TaskExecutorResourceSpec taskExecutorResourceSpec =
+            new TaskExecutorResourceSpec(
+                    new CPUResource(1.0),
+                    MemorySize.ofMebiBytes(1024),
+                    MemorySize.ofMebiBytes(1024),
+                    MemorySize.ofMebiBytes(1024),
+                    MemorySize.ofMebiBytes(1024),
+                    Collections.emptyList());
+
+    private final WorkingDirectory workingDirectory;
+
+    private TaskExecutorBuilder(
+            RpcService rpcService,
+            HighAvailabilityServices haServices,
+            WorkingDirectory workingDirectory) {
+        this.rpcService = rpcService;
+        this.haServices = haServices;
+        this.workingDirectory = workingDirectory;
+    }
+
+    public TaskExecutorBuilder setConfiguration(Configuration configuration) {
+        this.configuration = configuration;
+        return this;
+    }
+
+    public TaskExecutorBuilder setResourceId(ResourceID resourceId) {
+        this.resourceId = resourceId;
+        return this;
+    }
+
+    public TaskExecutor build() throws Exception {
+
+        final TaskExecutorBlobService resolvedTaskExecutorBlobService;
+
+        TaskExecutorResourceUtils.adjustForLocalExecution(configuration);
+
+        if (taskExecutorBlobService == null) {
+            resolvedTaskExecutorBlobService = 
NoOpTaskExecutorBlobService.INSTANCE;
+        } else {
+            resolvedTaskExecutorBlobService = taskExecutorBlobService;
+        }
+
+        final TaskManagerConfiguration resolvedTaskManagerConfiguration;
+
+        if (taskManagerConfiguration == null) {
+            resolvedTaskManagerConfiguration =
+                    TaskManagerConfiguration.fromConfiguration(
+                            configuration,
+                            taskExecutorResourceSpec,
+                            rpcService.getAddress(),
+                            workingDirectory.getTmpDirectory());
+        } else {
+            resolvedTaskManagerConfiguration = taskManagerConfiguration;
+        }
+
+        final TaskManagerServices resolvedTaskManagerServices;
+
+        if (taskManagerServices == null) {
+            final TaskManagerServicesConfiguration 
taskManagerServicesConfiguration =
+                    TaskManagerServicesConfiguration.fromConfiguration(
+                            configuration,
+                            resourceId,
+                            rpcService.getAddress(),
+                            true,
+                            taskExecutorResourceSpec,
+                            workingDirectory);
+            resolvedTaskManagerServices =
+                    TaskManagerServices.fromConfiguration(
+                            taskManagerServicesConfiguration,
+                            VoidPermanentBlobService.INSTANCE,
+                            
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+                            Executors.newDirectExecutorService(),
+                            throwable -> {},
+                            workingDirectory);
+        } else {
+            resolvedTaskManagerServices = taskManagerServices;
+        }
+
+        return new TaskExecutor(
+                rpcService,
+                resolvedTaskManagerConfiguration,
+                haServices,
+                resolvedTaskManagerServices,
+                externalResourceInfoProvider,
+                heartbeatServices,
+                taskManagerMetricGroup,
+                metricQueryServiceAddress,
+                resolvedTaskExecutorBlobService,
+                fatalErrorHandler,
+                partitionTracker);
+    }
+
+    public static TaskExecutorBuilder newBuilder(
+            RpcService rpcService,
+            HighAvailabilityServices highAvailabilityServices,
+            WorkingDirectory workingDirectory) {
+        return new TaskExecutorBuilder(rpcService, highAvailabilityServices, 
workingDirectory);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java
new file mode 100644
index 0000000..8683182
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.WorkingDirectory;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+/** Recovery tests for {@link TaskExecutor}. */
+@ExtendWith(TestLoggerExtension.class)
+class TaskExecutorRecoveryTest {
+    private final TestingRpcServiceExtension rpcServiceExtension = new 
TestingRpcServiceExtension();
+
+    @RegisterExtension
+    private final EachCallbackWrapper<TestingRpcServiceExtension> eachWrapper =
+            new EachCallbackWrapper<>(rpcServiceExtension);
+
+    @Test
+    public void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir 
File tempDir)
+            throws Exception {
+        final ResourceID resourceId = ResourceID.generate();
+
+        final Configuration configuration = new Configuration();
+        configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+        configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true);
+
+        final TestingResourceManagerGateway testingResourceManagerGateway =
+                new TestingResourceManagerGateway();
+        final ArrayBlockingQueue<TaskExecutorSlotReport> queue = new 
ArrayBlockingQueue<>(2);
+        testingResourceManagerGateway.setSendSlotReportFunction(
+                slotReportInformation -> {
+                    queue.offer(
+                            TaskExecutorSlotReport.create(
+                                    slotReportInformation.f0, 
slotReportInformation.f2));
+                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                });
+
+        final TestingRpcService rpcService = 
rpcServiceExtension.getTestingRpcService();
+        rpcService.registerGateway(
+                testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+
+        final JobID jobId = new JobID();
+
+        final TestingHighAvailabilityServices highAvailabilityServices =
+                new TestingHighAvailabilityServices();
+
+        highAvailabilityServices.setResourceManagerLeaderRetriever(
+                new SettableLeaderRetrievalService(
+                        testingResourceManagerGateway.getAddress(),
+                        
testingResourceManagerGateway.getFencingToken().toUUID()));
+        final SettableLeaderRetrievalService jobMasterLeaderRetriever =
+                new SettableLeaderRetrievalService();
+        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, 
jobMasterLeaderRetriever);
+
+        final WorkingDirectory workingDirectory = 
WorkingDirectory.create(tempDir);
+        final TaskExecutor taskExecutor =
+                TaskExecutorBuilder.newBuilder(
+                                rpcService, highAvailabilityServices, 
workingDirectory)
+                        .setConfiguration(configuration)
+                        .setResourceId(resourceId)
+                        .build();
+
+        taskExecutor.start();
+
+        final TaskExecutorGateway taskExecutorGateway =
+                taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+        final TaskExecutorSlotReport taskExecutorSlotReport = queue.take();
+
+        final SlotReport slotReport = taskExecutorSlotReport.getSlotReport();
+
+        assertThat(slotReport.getNumSlotStatus(), is(2));
+
+        final SlotStatus slotStatus = slotReport.iterator().next();
+        final SlotID allocatedSlotID = slotStatus.getSlotID();
+
+        final AllocationID allocationId = new AllocationID();
+        taskExecutorGateway
+                .requestSlot(
+                        allocatedSlotID,
+                        jobId,
+                        allocationId,
+                        slotStatus.getResourceProfile(),
+                        "localhost",
+                        testingResourceManagerGateway.getFencingToken(),
+                        Time.seconds(10L))
+                .join();
+
+        taskExecutor.close();
+
+        final BlockingQueue<Collection<SlotOffer>> offeredSlots = new 
ArrayBlockingQueue<>(1);
+
+        final TestingJobMasterGateway jobMasterGateway =
+                new TestingJobMasterGatewayBuilder()
+                        .setOfferSlotsFunction(
+                                (resourceID, slotOffers) -> {
+                                    offeredSlots.offer(new 
HashSet<>(slotOffers));
+                                    return 
CompletableFuture.completedFuture(slotOffers);
+                                })
+                        .build();
+        rpcService.registerGateway(jobMasterGateway.getAddress(), 
jobMasterGateway);
+        jobMasterLeaderRetriever.notifyListener(
+                jobMasterGateway.getAddress(), 
jobMasterGateway.getFencingToken().toUUID());
+
+        // recover the TaskExecutor
+        final TaskExecutor recoveredTaskExecutor =
+                TaskExecutorBuilder.newBuilder(
+                                rpcService, highAvailabilityServices, 
workingDirectory)
+                        .setConfiguration(configuration)
+                        .setResourceId(resourceId)
+                        .build();
+
+        recoveredTaskExecutor.start();
+
+        final TaskExecutorSlotReport recoveredSlotReport = queue.take();
+
+        for (SlotStatus status : recoveredSlotReport.getSlotReport()) {
+            if (status.getSlotID().equals(allocatedSlotID)) {
+                assertThat(status.getJobID(), is(jobId));
+                assertThat(status.getAllocationID(), is(allocationId));
+            } else {
+                assertThat(status.getJobID(), is(nullValue()));
+            }
+        }
+
+        final Collection<SlotOffer> take = offeredSlots.take();
+
+        assertThat(take, hasSize(1));
+        final SlotOffer offeredSlot = take.iterator().next();
+
+        assertThat(offeredSlot.getAllocationId(), is(allocationId));
+    }
+
+    private static final class TaskExecutorSlotReport {
+        private final ResourceID taskExecutorResourceId;
+        private final SlotReport slotReport;
+
+        private TaskExecutorSlotReport(ResourceID taskExecutorResourceId, 
SlotReport slotReport) {
+            this.taskExecutorResourceId = taskExecutorResourceId;
+            this.slotReport = slotReport;
+        }
+
+        public ResourceID getTaskExecutorResourceId() {
+            return taskExecutorResourceId;
+        }
+
+        public SlotReport getSlotReport() {
+            return slotReport;
+        }
+
+        public static TaskExecutorSlotReport create(
+                ResourceID taskExecutorResourceId, SlotReport slotReport) {
+            return new TaskExecutorSlotReport(taskExecutorResourceId, 
slotReport);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/WorkingDirectoryExtension.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/WorkingDirectoryExtension.java
index df58674..cf42fac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/WorkingDirectoryExtension.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/WorkingDirectoryExtension.java
@@ -21,17 +21,19 @@ package org.apache.flink.runtime.testutils;
 import org.apache.flink.core.testutils.CustomExtension;
 import org.apache.flink.runtime.entrypoint.WorkingDirectory;
 
-import org.junit.jupiter.api.io.TempDir;
-
 import java.io.File;
 import java.io.IOException;
 
 /** Extension to generate {@link WorkingDirectory}. */
 public class WorkingDirectoryExtension implements CustomExtension {
-    @TempDir File tmpDirectory;
+    private final File tmpDirectory;
 
     private int counter = 0;
 
+    public WorkingDirectoryExtension(File tmpDirectory) {
+        this.tmpDirectory = tmpDirectory;
+    }
+
     public WorkingDirectory createNewWorkingDirectory() throws IOException {
         return WorkingDirectory.create(new File(tmpDirectory, 
"working_directory_" + counter++));
     }

Reply via email to