zentol commented on a change in pull request #18237:
URL: https://github.com/apache/flink/pull/18237#discussion_r799486723



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -2015,6 +2031,123 @@ private void freeNoLongerUsedSlots(AllocatedSlotReport 
allocatedSlotReport) {
         }
     }
 
+    private void tryPersistAllocationSnapshot(
+            SlotID slotId,
+            JobID jobId,
+            String jobTargetAddress,
+            AllocationID allocationId,
+            ResourceProfile resourceProfile) {
+        if (!allocationsDirectory.exists() && !allocationsDirectory.mkdirs()) {
+            log.debug(
+                    "Allocations directory {} doesn't exist and cannot be 
created.",
+                    allocationsDirectory.toPath());
+            return;
+        }
+
+        // Let's try to write the slot allocations on file
+        final File slotAllocationSnapshotFile = 
slotAllocationFile(slotId.getSlotNumber());
+        try (ObjectOutputStream oos =
+                new ObjectOutputStream(new 
FileOutputStream(slotAllocationSnapshotFile))) {
+            oos.writeObject(
+                    new LocalSlotAllocationSnapshot(
+                            slotId, jobId, jobTargetAddress, allocationId, 
resourceProfile));
+            log.debug(
+                    "Successfully written allocation state metadata file {} 
for job {} and allocation {}.",
+                    slotAllocationSnapshotFile.toPath(),
+                    jobId,
+                    allocationId);
+        } catch (IOException e) {
+            log.debug(
+                    "Cannot write the local allocations state. File {} for job 
{} and allocation {}.",
+                    slotAllocationSnapshotFile.toPath(),
+                    jobId,
+                    allocationId,
+                    e);
+        }
+    }
+
+    private void tryCleanupSlotAllocationSnapshot(int requestedIndex) {
+        if (!allocationsDirectory.exists()) {
+            log.debug(
+                    "There is no local allocations snapshot directory to 
cleanup {}.",
+                    allocationsDirectory.toPath());
+            return;
+        }
+
+        // Let's try to write the slot allocations on file
+        final File slotAllocationSnapshotFile = 
slotAllocationFile(requestedIndex);
+        try {
+            FileUtils.deleteFileOrDirectory(slotAllocationSnapshotFile);
+            log.debug(
+                    "Successfully deleted allocation state metadata file {}.",
+                    slotAllocationSnapshotFile.toPath());
+        } catch (IOException e) {
+            log.debug(
+                    "Cannot delete the local allocations state file {}.",
+                    slotAllocationSnapshotFile.toPath(),
+                    e);
+        }
+    }
+
+    private File slotAllocationFile(int slotIndex) {
+        return new File(allocationsDirectory.getAbsolutePath(), slotIndex + 
".bin");
+    }
+
+    /**
+     * This method tries to repopulate the {@link JobTable} and {@link 
TaskSlotTable} from the local
+     * filesystem in a best-effort manner.
+     */
+    private void tryLoadLocalAllocationSnapshots() {
+        if (!allocationsDirectory.exists()) {
+            log.debug(
+                    "There is no local allocations snapshot directory to load 
from {}.",
+                    allocationsDirectory.toPath());
+            return;
+        }
+
+        // Let's try to populate the slot allocation from local file
+        final File[] slotAllocationFiles = allocationsDirectory.listFiles();
+        if (slotAllocationFiles == null) {
+            log.debug("No allocation files to load.");
+            return;
+        }
+
+        List<LocalSlotAllocationSnapshot> allocations = new 
ArrayList<>(slotAllocationFiles.length);
+
+        for (File allocationFile : slotAllocationFiles) {
+            try (ObjectInputStream ois =
+                    new ObjectInputStream(new 
FileInputStream(allocationFile))) {
+                allocations.add((LocalSlotAllocationSnapshot) 
ois.readObject());
+            } catch (IOException | ClassNotFoundException e) {
+                log.debug(
+                        "Cannot read the local allocations state file {}.",
+                        allocationFile.toPath(),
+                        e);
+            }
+        }
+
+        log.debug("Resolved allocation files {}.", allocations);
+
+        for (LocalSlotAllocationSnapshot allocationSnapshot : allocations) {
+            try {
+                allocateSlot(
+                        allocationSnapshot.getSlotID(),
+                        allocationSnapshot.getJobId(),
+                        allocationSnapshot.getAllocationId(),
+                        allocationSnapshot.getResourceProfile());
+
+                jobTable.getOrCreateJob(
+                        allocationSnapshot.getJobId(),
+                        () ->
+                                registerNewJobAndCreateServices(
+                                        allocationSnapshot.getJobId(),
+                                        
allocationSnapshot.getJobTargetAddress()));
+            } catch (Exception e) {
+                log.debug("Cannot reallocate restored slot {}.", 
allocationSnapshot, e);

Review comment:
       The exception handling here differs from requestSlot; is that 
intentional? Overall I'm concerned about having to keep 2 places in sync.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
##########
@@ -32,16 +32,25 @@
 import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
 import org.apache.flink.runtime.testutils.WorkingDirectoryResource;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Reference;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
 
+import org.apache.flink.shaded.curator4.com.google.common.collect.Sets;

Review comment:
       Shouldn't use the curator import

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java
##########
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.test.recovery.utils.TaskExecutorProcessEntryPoint;
+import org.apache.flink.test.util.TestProcessBuilder;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests local recovery by restarting Flink processes. */
+@ExtendWith(TestLoggerExtension.class)
+public class LocalRecoveryITCase {

Review comment:
       FYI: in junit test classes and methods can be package private.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.WorkingDirectory;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.testutils.WorkingDirectoryResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Rule;

Review comment:
       Should use junit5

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
##########
@@ -176,14 +186,52 @@ public void storeLocalState(
         }
     }
 
+    /**
+     * Writes a task state snapshot file that contains the serialized content 
of the local state.
+     *
+     * @param checkpointId identifying the checkpoint
+     * @param localState task state snapshot that will be persisted
+     */
+    private void persistLocalStateMetadata(long checkpointId, 
TaskStateSnapshot localState) {
+        final File taskStateSnapshotFile = 
getTaskStateSnapshotFile(checkpointId);
+        try (ObjectOutputStream oos =
+                new ObjectOutputStream(new 
FileOutputStream(taskStateSnapshotFile))) {
+            oos.writeObject(localState);
+
+            LOG.debug(
+                    "Successfully written local task state snapshot file {} 
for checkpoint {}.",
+                    taskStateSnapshotFile,
+                    checkpointId);
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e, "Could not write the local task state 
snapshot file.");
+        }
+    }
+
+    @VisibleForTesting
+    @Nonnull
+    File getTaskStateSnapshotFile(long checkpointId) {
+        final File checkpointDirectory =
+                localRecoveryConfig
+                        .getLocalStateDirectoryProvider()
+                        .subtaskSpecificCheckpointDirectory(checkpointId);

Review comment:
       On master `getLocalStateDirectoryProvider()` returns an optional.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to