tillrohrmann commented on a change in pull request #18237:
URL: https://github.com/apache/flink/pull/18237#discussion_r800074364



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -2015,6 +2031,123 @@ private void freeNoLongerUsedSlots(AllocatedSlotReport 
allocatedSlotReport) {
         }
     }
 
+    private void tryPersistAllocationSnapshot(
+            SlotID slotId,
+            JobID jobId,
+            String jobTargetAddress,
+            AllocationID allocationId,
+            ResourceProfile resourceProfile) {
+        if (!allocationsDirectory.exists() && !allocationsDirectory.mkdirs()) {
+            log.debug(
+                    "Allocations directory {} doesn't exist and cannot be 
created.",
+                    allocationsDirectory.toPath());
+            return;
+        }
+
+        // Let's try to write the slot allocations on file
+        final File slotAllocationSnapshotFile = 
slotAllocationFile(slotId.getSlotNumber());
+        try (ObjectOutputStream oos =
+                new ObjectOutputStream(new 
FileOutputStream(slotAllocationSnapshotFile))) {
+            oos.writeObject(
+                    new LocalSlotAllocationSnapshot(
+                            slotId, jobId, jobTargetAddress, allocationId, 
resourceProfile));
+            log.debug(
+                    "Successfully written allocation state metadata file {} 
for job {} and allocation {}.",
+                    slotAllocationSnapshotFile.toPath(),
+                    jobId,
+                    allocationId);
+        } catch (IOException e) {
+            log.debug(
+                    "Cannot write the local allocations state. File {} for job 
{} and allocation {}.",
+                    slotAllocationSnapshotFile.toPath(),
+                    jobId,
+                    allocationId,
+                    e);
+        }
+    }
+
+    private void tryCleanupSlotAllocationSnapshot(int requestedIndex) {
+        if (!allocationsDirectory.exists()) {
+            log.debug(
+                    "There is no local allocations snapshot directory to 
cleanup {}.",
+                    allocationsDirectory.toPath());
+            return;
+        }
+
+        // Let's try to write the slot allocations on file
+        final File slotAllocationSnapshotFile = 
slotAllocationFile(requestedIndex);
+        try {
+            FileUtils.deleteFileOrDirectory(slotAllocationSnapshotFile);
+            log.debug(
+                    "Successfully deleted allocation state metadata file {}.",
+                    slotAllocationSnapshotFile.toPath());
+        } catch (IOException e) {
+            log.debug(
+                    "Cannot delete the local allocations state file {}.",
+                    slotAllocationSnapshotFile.toPath(),
+                    e);
+        }
+    }
+
+    private File slotAllocationFile(int slotIndex) {
+        return new File(allocationsDirectory.getAbsolutePath(), slotIndex + 
".bin");
+    }
+
+    /**
+     * This method tries to repopulate the {@link JobTable} and {@link 
TaskSlotTable} from the local
+     * filesystem in a best-effort manner.
+     */
+    private void tryLoadLocalAllocationSnapshots() {
+        if (!allocationsDirectory.exists()) {
+            log.debug(
+                    "There is no local allocations snapshot directory to load 
from {}.",
+                    allocationsDirectory.toPath());
+            return;
+        }
+
+        // Let's try to populate the slot allocation from local file
+        final File[] slotAllocationFiles = allocationsDirectory.listFiles();
+        if (slotAllocationFiles == null) {
+            log.debug("No allocation files to load.");
+            return;
+        }
+
+        List<LocalSlotAllocationSnapshot> allocations = new 
ArrayList<>(slotAllocationFiles.length);
+
+        for (File allocationFile : slotAllocationFiles) {
+            try (ObjectInputStream ois =
+                    new ObjectInputStream(new 
FileInputStream(allocationFile))) {
+                allocations.add((LocalSlotAllocationSnapshot) 
ois.readObject());
+            } catch (IOException | ClassNotFoundException e) {
+                log.debug(
+                        "Cannot read the local allocations state file {}.",
+                        allocationFile.toPath(),
+                        e);
+            }
+        }
+
+        log.debug("Resolved allocation files {}.", allocations);
+
+        for (LocalSlotAllocationSnapshot allocationSnapshot : allocations) {
+            try {
+                allocateSlot(
+                        allocationSnapshot.getSlotID(),
+                        allocationSnapshot.getJobId(),
+                        allocationSnapshot.getAllocationId(),
+                        allocationSnapshot.getResourceProfile());
+
+                jobTable.getOrCreateJob(
+                        allocationSnapshot.getJobId(),
+                        () ->
+                                registerNewJobAndCreateServices(
+                                        allocationSnapshot.getJobId(),
+                                        
allocationSnapshot.getJobTargetAddress()));
+            } catch (Exception e) {
+                log.debug("Cannot reallocate restored slot {}.", 
allocationSnapshot, e);

Review comment:
       I will see whether I can unify both methods. I think this is a good 
improvement.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to