This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f95a0c7e867b7af43be86701a6586f39e73da00f Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Wed Nov 17 18:15:51 2021 +0100 [FLINK-25817] TaskSlotTable persists on local filesystem the slot allocation ids Signed-off-by: slinkydeveloper <francescogu...@gmail.com> --- .../flink/runtime/taskexecutor/TaskExecutor.java | 133 +++++++++++++++++++++ .../slot/LocalSlotAllocationSnapshot.java | 86 +++++++++++++ 2 files changed, 219 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index fa8f7d0..ace71f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -115,6 +115,7 @@ import org.apache.flink.runtime.taskexecutor.rpc.RpcKvStateRegistryListener; import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker; import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier; import org.apache.flink.runtime.taskexecutor.rpc.RpcTaskOperatorEventGateway; +import org.apache.flink.runtime.taskexecutor.slot.LocalSlotAllocationSnapshot; import org.apache.flink.runtime.taskexecutor.slot.SlotActions; import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException; import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException; @@ -129,6 +130,7 @@ import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest; import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.OptionalConsumer; import org.apache.flink.util.Preconditions; @@ -145,8 +147,11 @@ import javax.annotation.Nullable; import java.io.File; import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; @@ -236,6 +241,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private final LeaderRetrievalService resourceManagerLeaderRetriever; + /** Directory to read/write allocation files. */ + private final File allocationsDirectory; + // ------------------------------------------------------------------------ private final HardwareDescription hardwareDescription; @@ -334,6 +342,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { ScheduledExecutorService sampleExecutor = Executors.newSingleThreadScheduledExecutor(sampleThreadFactory); this.threadInfoSampleService = new ThreadInfoSampleService(sampleExecutor); + + this.allocationsDirectory = null; } private HeartbeatManager<Void, TaskExecutorHeartbeatPayload> @@ -422,6 +432,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { new FileCache( taskManagerConfiguration.getTmpDirectories(), taskExecutorBlobService.getPermanentBlobService()); + + tryLoadLocalAllocationSnapshots(); } catch (Exception e) { handleStartTaskExecutorServicesException(e); } @@ -1066,6 +1078,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { return FutureUtils.completedExceptionally(new TaskManagerException(message)); } + tryPersistAllocationSnapshot(slotId, jobId, targetAddress, allocationId, resourceProfile); + try { allocateSlot(slotId, jobId, allocationId, resourceProfile); } catch (SlotAllocationException sae) { @@ -1901,6 +1915,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { final int slotIndex = taskSlotTable.freeSlot(allocationId, cause); + tryCleanupSlotAllocationSnapshot(slotIndex); + if (slotIndex != -1) { if (isConnectedToResourceManager()) { @@ -2015,6 +2031,123 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } } + private void tryPersistAllocationSnapshot( + SlotID slotId, + JobID jobId, + String jobTargetAddress, + AllocationID allocationId, + ResourceProfile resourceProfile) { + if (!allocationsDirectory.exists() && !allocationsDirectory.mkdirs()) { + log.debug( + "Allocations directory {} doesn't exist and cannot be created.", + allocationsDirectory.toPath()); + return; + } + + // Let's try to write the slot allocations on file + final File slotAllocationSnapshotFile = slotAllocationFile(slotId.getSlotNumber()); + try (ObjectOutputStream oos = + new ObjectOutputStream(new FileOutputStream(slotAllocationSnapshotFile))) { + oos.writeObject( + new LocalSlotAllocationSnapshot( + slotId, jobId, jobTargetAddress, allocationId, resourceProfile)); + log.debug( + "Successfully written allocation state metadata file {} for job {} and allocation {}.", + slotAllocationSnapshotFile.toPath(), + jobId, + allocationId); + } catch (IOException e) { + log.debug( + "Cannot write the local allocations state. File {} for job {} and allocation {}.", + slotAllocationSnapshotFile.toPath(), + jobId, + allocationId, + e); + } + } + + private void tryCleanupSlotAllocationSnapshot(int requestedIndex) { + if (!allocationsDirectory.exists()) { + log.debug( + "There is no local allocations snapshot directory to cleanup {}.", + allocationsDirectory.toPath()); + return; + } + + // Let's try to write the slot allocations on file + final File slotAllocationSnapshotFile = slotAllocationFile(requestedIndex); + try { + FileUtils.deleteFileOrDirectory(slotAllocationSnapshotFile); + log.debug( + "Successfully deleted allocation state metadata file {}.", + slotAllocationSnapshotFile.toPath()); + } catch (IOException e) { + log.debug( + "Cannot delete the local allocations state file {}.", + slotAllocationSnapshotFile.toPath(), + e); + } + } + + private File slotAllocationFile(int slotIndex) { + return new File(allocationsDirectory.getAbsolutePath(), slotIndex + ".bin"); + } + + /** + * This method tries to repopulate the {@link JobTable} and {@link TaskSlotTable} from the local + * filesystem in a best-effort manner. + */ + private void tryLoadLocalAllocationSnapshots() { + if (!allocationsDirectory.exists()) { + log.debug( + "There is no local allocations snapshot directory to load from {}.", + allocationsDirectory.toPath()); + return; + } + + // Let's try to populate the slot allocation from local file + final File[] slotAllocationFiles = allocationsDirectory.listFiles(); + if (slotAllocationFiles == null) { + log.debug("No allocation files to load."); + return; + } + + List<LocalSlotAllocationSnapshot> allocations = new ArrayList<>(slotAllocationFiles.length); + + for (File allocationFile : slotAllocationFiles) { + try (ObjectInputStream ois = + new ObjectInputStream(new FileInputStream(allocationFile))) { + allocations.add((LocalSlotAllocationSnapshot) ois.readObject()); + } catch (IOException | ClassNotFoundException e) { + log.debug( + "Cannot read the local allocations state file {}.", + allocationFile.toPath(), + e); + } + } + + log.debug("Resolved allocation files {}.", allocations); + + for (LocalSlotAllocationSnapshot allocationSnapshot : allocations) { + try { + allocateSlot( + allocationSnapshot.getSlotID(), + allocationSnapshot.getJobId(), + allocationSnapshot.getAllocationId(), + allocationSnapshot.getResourceProfile()); + + jobTable.getOrCreateJob( + allocationSnapshot.getJobId(), + () -> + registerNewJobAndCreateServices( + allocationSnapshot.getJobId(), + allocationSnapshot.getJobTargetAddress())); + } catch (Exception e) { + log.debug("Cannot reallocate restored slot {}.", allocationSnapshot, e); + } + } + } + // ------------------------------------------------------------------------ // Internal utility methods // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/LocalSlotAllocationSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/LocalSlotAllocationSnapshot.java new file mode 100644 index 0000000..c47e5ba --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/LocalSlotAllocationSnapshot.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.slot; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; + +/** Model to save local slot allocation info. */ +public class LocalSlotAllocationSnapshot implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + private final SlotID slotID; + private final JobID jobId; + private final String jobTargetAddress; + private final AllocationID allocationId; + private final ResourceProfile resourceProfile; + + public LocalSlotAllocationSnapshot( + SlotID slotID, + JobID jobId, + String jobTargetAddress, + AllocationID allocationId, + ResourceProfile resourceProfile) { + this.slotID = slotID; + this.jobId = jobId; + this.jobTargetAddress = jobTargetAddress; + this.allocationId = allocationId; + this.resourceProfile = resourceProfile; + } + + public SlotID getSlotID() { + return slotID; + } + + public JobID getJobId() { + return jobId; + } + + public String getJobTargetAddress() { + return jobTargetAddress; + } + + public AllocationID getAllocationId() { + return allocationId; + } + + public ResourceProfile getResourceProfile() { + return resourceProfile; + } + + @Override + public String toString() { + return "LocalSlotAllocationSnapshot{" + + "slotID=" + + slotID + + ", jobId=" + + jobId + + ", jobTargetAddress='" + + jobTargetAddress + + '\'' + + ", allocationId=" + + allocationId + + ", resourceProfile=" + + resourceProfile + + '}'; + } +}