tillrohrmann commented on a change in pull request #18237:
URL: https://github.com/apache/flink/pull/18237#discussion_r800074364
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -2015,6 +2031,123 @@ private void freeNoLongerUsedSlots(AllocatedSlotReport
allocatedSlotReport) {
}
}
+ private void tryPersistAllocationSnapshot(
+ SlotID slotId,
+ JobID jobId,
+ String jobTargetAddress,
+ AllocationID allocationId,
+ ResourceProfile resourceProfile) {
+ if (!allocationsDirectory.exists() && !allocationsDirectory.mkdirs()) {
+ log.debug(
+ "Allocations directory {} doesn't exist and cannot be
created.",
+ allocationsDirectory.toPath());
+ return;
+ }
+
+ // Let's try to write the slot allocations on file
+ final File slotAllocationSnapshotFile =
slotAllocationFile(slotId.getSlotNumber());
+ try (ObjectOutputStream oos =
+ new ObjectOutputStream(new
FileOutputStream(slotAllocationSnapshotFile))) {
+ oos.writeObject(
+ new LocalSlotAllocationSnapshot(
+ slotId, jobId, jobTargetAddress, allocationId,
resourceProfile));
+ log.debug(
+ "Successfully written allocation state metadata file {}
for job {} and allocation {}.",
+ slotAllocationSnapshotFile.toPath(),
+ jobId,
+ allocationId);
+ } catch (IOException e) {
+ log.debug(
+ "Cannot write the local allocations state. File {} for job
{} and allocation {}.",
+ slotAllocationSnapshotFile.toPath(),
+ jobId,
+ allocationId,
+ e);
+ }
+ }
+
+ private void tryCleanupSlotAllocationSnapshot(int requestedIndex) {
+ if (!allocationsDirectory.exists()) {
+ log.debug(
+ "There is no local allocations snapshot directory to
cleanup {}.",
+ allocationsDirectory.toPath());
+ return;
+ }
+
+ // Let's try to write the slot allocations on file
+ final File slotAllocationSnapshotFile =
slotAllocationFile(requestedIndex);
+ try {
+ FileUtils.deleteFileOrDirectory(slotAllocationSnapshotFile);
+ log.debug(
+ "Successfully deleted allocation state metadata file {}.",
+ slotAllocationSnapshotFile.toPath());
+ } catch (IOException e) {
+ log.debug(
+ "Cannot delete the local allocations state file {}.",
+ slotAllocationSnapshotFile.toPath(),
+ e);
+ }
+ }
+
+ private File slotAllocationFile(int slotIndex) {
+ return new File(allocationsDirectory.getAbsolutePath(), slotIndex +
".bin");
+ }
+
+ /**
+ * This method tries to repopulate the {@link JobTable} and {@link
TaskSlotTable} from the local
+ * filesystem in a best-effort manner.
+ */
+ private void tryLoadLocalAllocationSnapshots() {
+ if (!allocationsDirectory.exists()) {
+ log.debug(
+ "There is no local allocations snapshot directory to load
from {}.",
+ allocationsDirectory.toPath());
+ return;
+ }
+
+ // Let's try to populate the slot allocation from local file
+ final File[] slotAllocationFiles = allocationsDirectory.listFiles();
+ if (slotAllocationFiles == null) {
+ log.debug("No allocation files to load.");
+ return;
+ }
+
+ List<LocalSlotAllocationSnapshot> allocations = new
ArrayList<>(slotAllocationFiles.length);
+
+ for (File allocationFile : slotAllocationFiles) {
+ try (ObjectInputStream ois =
+ new ObjectInputStream(new
FileInputStream(allocationFile))) {
+ allocations.add((LocalSlotAllocationSnapshot)
ois.readObject());
+ } catch (IOException | ClassNotFoundException e) {
+ log.debug(
+ "Cannot read the local allocations state file {}.",
+ allocationFile.toPath(),
+ e);
+ }
+ }
+
+ log.debug("Resolved allocation files {}.", allocations);
+
+ for (LocalSlotAllocationSnapshot allocationSnapshot : allocations) {
+ try {
+ allocateSlot(
+ allocationSnapshot.getSlotID(),
+ allocationSnapshot.getJobId(),
+ allocationSnapshot.getAllocationId(),
+ allocationSnapshot.getResourceProfile());
+
+ jobTable.getOrCreateJob(
+ allocationSnapshot.getJobId(),
+ () ->
+ registerNewJobAndCreateServices(
+ allocationSnapshot.getJobId(),
+
allocationSnapshot.getJobTargetAddress()));
+ } catch (Exception e) {
+ log.debug("Cannot reallocate restored slot {}.",
allocationSnapshot, e);
Review comment:
I will see whether I can unify both methods. I think this is a good
improvement.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]