This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f95a0c7e867b7af43be86701a6586f39e73da00f
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Wed Nov 17 18:15:51 2021 +0100

    [FLINK-25817] TaskSlotTable persists on local filesystem the slot 
allocation ids
    
    Signed-off-by: slinkydeveloper <francescogu...@gmail.com>
---
 .../flink/runtime/taskexecutor/TaskExecutor.java   | 133 +++++++++++++++++++++
 .../slot/LocalSlotAllocationSnapshot.java          |  86 +++++++++++++
 2 files changed, 219 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index fa8f7d0..ace71f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -115,6 +115,7 @@ import 
org.apache.flink.runtime.taskexecutor.rpc.RpcKvStateRegistryListener;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
 import 
org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcTaskOperatorEventGateway;
+import org.apache.flink.runtime.taskexecutor.slot.LocalSlotAllocationSnapshot;
 import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
 import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
@@ -129,6 +130,7 @@ import 
org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;
 import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.OptionalConsumer;
 import org.apache.flink.util.Preconditions;
@@ -145,8 +147,11 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -236,6 +241,9 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
     private final LeaderRetrievalService resourceManagerLeaderRetriever;
 
+    /** Directory to read/write allocation files. */
+    private final File allocationsDirectory;
+
     // ------------------------------------------------------------------------
 
     private final HardwareDescription hardwareDescription;
@@ -334,6 +342,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
         ScheduledExecutorService sampleExecutor =
                 
Executors.newSingleThreadScheduledExecutor(sampleThreadFactory);
         this.threadInfoSampleService = new 
ThreadInfoSampleService(sampleExecutor);
+
+        this.allocationsDirectory = null;
     }
 
     private HeartbeatManager<Void, TaskExecutorHeartbeatPayload>
@@ -422,6 +432,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                     new FileCache(
                             taskManagerConfiguration.getTmpDirectories(),
                             taskExecutorBlobService.getPermanentBlobService());
+
+            tryLoadLocalAllocationSnapshots();
         } catch (Exception e) {
             handleStartTaskExecutorServicesException(e);
         }
@@ -1066,6 +1078,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
             return FutureUtils.completedExceptionally(new 
TaskManagerException(message));
         }
 
+        tryPersistAllocationSnapshot(slotId, jobId, targetAddress, 
allocationId, resourceProfile);
+
         try {
             allocateSlot(slotId, jobId, allocationId, resourceProfile);
         } catch (SlotAllocationException sae) {
@@ -1901,6 +1915,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
             final int slotIndex = taskSlotTable.freeSlot(allocationId, cause);
 
+            tryCleanupSlotAllocationSnapshot(slotIndex);
+
             if (slotIndex != -1) {
 
                 if (isConnectedToResourceManager()) {
@@ -2015,6 +2031,123 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
         }
     }
 
+    private void tryPersistAllocationSnapshot(
+            SlotID slotId,
+            JobID jobId,
+            String jobTargetAddress,
+            AllocationID allocationId,
+            ResourceProfile resourceProfile) {
+        if (!allocationsDirectory.exists() && !allocationsDirectory.mkdirs()) {
+            log.debug(
+                    "Allocations directory {} doesn't exist and cannot be 
created.",
+                    allocationsDirectory.toPath());
+            return;
+        }
+
+        // Let's try to write the slot allocations on file
+        final File slotAllocationSnapshotFile = 
slotAllocationFile(slotId.getSlotNumber());
+        try (ObjectOutputStream oos =
+                new ObjectOutputStream(new 
FileOutputStream(slotAllocationSnapshotFile))) {
+            oos.writeObject(
+                    new LocalSlotAllocationSnapshot(
+                            slotId, jobId, jobTargetAddress, allocationId, 
resourceProfile));
+            log.debug(
+                    "Successfully written allocation state metadata file {} 
for job {} and allocation {}.",
+                    slotAllocationSnapshotFile.toPath(),
+                    jobId,
+                    allocationId);
+        } catch (IOException e) {
+            log.debug(
+                    "Cannot write the local allocations state. File {} for job 
{} and allocation {}.",
+                    slotAllocationSnapshotFile.toPath(),
+                    jobId,
+                    allocationId,
+                    e);
+        }
+    }
+
+    private void tryCleanupSlotAllocationSnapshot(int requestedIndex) {
+        if (!allocationsDirectory.exists()) {
+            log.debug(
+                    "There is no local allocations snapshot directory to 
cleanup {}.",
+                    allocationsDirectory.toPath());
+            return;
+        }
+
+        // Let's try to write the slot allocations on file
+        final File slotAllocationSnapshotFile = 
slotAllocationFile(requestedIndex);
+        try {
+            FileUtils.deleteFileOrDirectory(slotAllocationSnapshotFile);
+            log.debug(
+                    "Successfully deleted allocation state metadata file {}.",
+                    slotAllocationSnapshotFile.toPath());
+        } catch (IOException e) {
+            log.debug(
+                    "Cannot delete the local allocations state file {}.",
+                    slotAllocationSnapshotFile.toPath(),
+                    e);
+        }
+    }
+
+    private File slotAllocationFile(int slotIndex) {
+        return new File(allocationsDirectory.getAbsolutePath(), slotIndex + 
".bin");
+    }
+
+    /**
+     * This method tries to repopulate the {@link JobTable} and {@link 
TaskSlotTable} from the local
+     * filesystem in a best-effort manner.
+     */
+    private void tryLoadLocalAllocationSnapshots() {
+        if (!allocationsDirectory.exists()) {
+            log.debug(
+                    "There is no local allocations snapshot directory to load 
from {}.",
+                    allocationsDirectory.toPath());
+            return;
+        }
+
+        // Let's try to populate the slot allocation from local file
+        final File[] slotAllocationFiles = allocationsDirectory.listFiles();
+        if (slotAllocationFiles == null) {
+            log.debug("No allocation files to load.");
+            return;
+        }
+
+        List<LocalSlotAllocationSnapshot> allocations = new 
ArrayList<>(slotAllocationFiles.length);
+
+        for (File allocationFile : slotAllocationFiles) {
+            try (ObjectInputStream ois =
+                    new ObjectInputStream(new 
FileInputStream(allocationFile))) {
+                allocations.add((LocalSlotAllocationSnapshot) 
ois.readObject());
+            } catch (IOException | ClassNotFoundException e) {
+                log.debug(
+                        "Cannot read the local allocations state file {}.",
+                        allocationFile.toPath(),
+                        e);
+            }
+        }
+
+        log.debug("Resolved allocation files {}.", allocations);
+
+        for (LocalSlotAllocationSnapshot allocationSnapshot : allocations) {
+            try {
+                allocateSlot(
+                        allocationSnapshot.getSlotID(),
+                        allocationSnapshot.getJobId(),
+                        allocationSnapshot.getAllocationId(),
+                        allocationSnapshot.getResourceProfile());
+
+                jobTable.getOrCreateJob(
+                        allocationSnapshot.getJobId(),
+                        () ->
+                                registerNewJobAndCreateServices(
+                                        allocationSnapshot.getJobId(),
+                                        
allocationSnapshot.getJobTargetAddress()));
+            } catch (Exception e) {
+                log.debug("Cannot reallocate restored slot {}.", 
allocationSnapshot, e);
+            }
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  Internal utility methods
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/LocalSlotAllocationSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/LocalSlotAllocationSnapshot.java
new file mode 100644
index 0000000..c47e5ba
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/LocalSlotAllocationSnapshot.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+
+/** Model to save local slot allocation info. */
+public class LocalSlotAllocationSnapshot implements java.io.Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final SlotID slotID;
+    private final JobID jobId;
+    private final String jobTargetAddress;
+    private final AllocationID allocationId;
+    private final ResourceProfile resourceProfile;
+
+    public LocalSlotAllocationSnapshot(
+            SlotID slotID,
+            JobID jobId,
+            String jobTargetAddress,
+            AllocationID allocationId,
+            ResourceProfile resourceProfile) {
+        this.slotID = slotID;
+        this.jobId = jobId;
+        this.jobTargetAddress = jobTargetAddress;
+        this.allocationId = allocationId;
+        this.resourceProfile = resourceProfile;
+    }
+
+    public SlotID getSlotID() {
+        return slotID;
+    }
+
+    public JobID getJobId() {
+        return jobId;
+    }
+
+    public String getJobTargetAddress() {
+        return jobTargetAddress;
+    }
+
+    public AllocationID getAllocationId() {
+        return allocationId;
+    }
+
+    public ResourceProfile getResourceProfile() {
+        return resourceProfile;
+    }
+
+    @Override
+    public String toString() {
+        return "LocalSlotAllocationSnapshot{"
+                + "slotID="
+                + slotID
+                + ", jobId="
+                + jobId
+                + ", jobTargetAddress='"
+                + jobTargetAddress
+                + '\''
+                + ", allocationId="
+                + allocationId
+                + ", resourceProfile="
+                + resourceProfile
+                + '}';
+    }
+}

Reply via email to