This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 990589c30a73a27412e2f8c7268c0c6448344993 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Wed Dec 29 16:50:54 2021 +0100 [FLINK-25817] Add LocalRecoveryITCase Adds an integration test for local recovery under process faults using the newly introduce working directory feature to persist state locally. --- .../flink/test/recovery/LocalRecoveryITCase.java | 347 +++++++++++++++++++++ 1 file changed, 347 insertions(+) diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java new file mode 100644 index 0000000..108cf2f --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.recovery; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.test.recovery.utils.TaskExecutorProcessEntryPoint; +import org.apache.flink.test.util.TestProcessBuilder; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.concurrent.Executors; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests local recovery by restarting Flink processes. */ +@ExtendWith(TestLoggerExtension.class) +class LocalRecoveryITCase { + + @TempDir private File tmpDirectory; + + @Test + public void testRecoverLocallyFromProcessCrashWithWorkingDirectory() throws Exception { + final Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.ADDRESS, "localhost"); + configuration.set(JobManagerOptions.PORT, 0); + configuration.set(RestOptions.BIND_PORT, "0"); + configuration.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L); + configuration.set(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 1000L); + configuration.set(HeartbeatManagerOptions.HEARTBEAT_RPC_FAILURE_THRESHOLD, 1); + configuration.set(ClusterOptions.PROCESS_WORKING_DIR_BASE, tmpDirectory.getAbsolutePath()); + configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true); + configuration.set(TaskManagerOptions.SLOT_TIMEOUT, Duration.ofSeconds(30L)); + + final int parallelism = 3; + + boolean success = false; + Collection<TaskManagerProcess> taskManagerProcesses = Collections.emptyList(); + try (final StandaloneSessionClusterEntrypoint clusterEntrypoint = + new StandaloneSessionClusterEntrypoint(configuration)) { + clusterEntrypoint.startCluster(); + + final Configuration configurationTemplate = new Configuration(configuration); + configurationTemplate.set(JobManagerOptions.PORT, clusterEntrypoint.getRpcPort()); + taskManagerProcesses = startTaskManagerProcesses(parallelism, configurationTemplate); + + final JobClient jobClient = submitJob(parallelism, clusterEntrypoint); + + final long waitingTimeInSeconds = 45L; + waitUntilCheckpointCompleted( + configuration, + clusterEntrypoint.getRestPort(), + jobClient.getJobID(), + Deadline.fromNow(Duration.ofSeconds(waitingTimeInSeconds))); + + restartTaskManagerProcesses(taskManagerProcesses, parallelism - 1); + + jobClient.getJobExecutionResult().get(waitingTimeInSeconds, TimeUnit.SECONDS); + + success = true; + } finally { + if (!success) { + for (TaskManagerProcess taskManagerProcess : taskManagerProcesses) { + printLogOutput(taskManagerProcess); + } + } + + for (TaskManagerProcess taskManagerProcess : taskManagerProcesses) { + taskManagerProcess.terminate(); + } + } + } + + private static void printLogOutput(TaskManagerProcess taskManagerProcess) { + for (TestProcessBuilder.TestProcess testProcess : taskManagerProcess.getProcessHistory()) { + AbstractTaskManagerProcessFailureRecoveryTest.printProcessLog( + taskManagerProcess.getName(), testProcess); + } + } + + private static void restartTaskManagerProcesses( + Collection<TaskManagerProcess> taskManagerProcesses, int numberRestarts) + throws IOException, InterruptedException { + Preconditions.checkArgument(numberRestarts <= taskManagerProcesses.size()); + + final Iterator<TaskManagerProcess> iterator = taskManagerProcesses.iterator(); + + for (int i = 0; i < numberRestarts; i++) { + iterator.next().restartProcess(createTaskManagerProcessBuilder()); + } + } + + private static Collection<TaskManagerProcess> startTaskManagerProcesses( + int numberTaskManagers, Configuration configurationTemplate) throws IOException { + final Collection<TaskManagerProcess> result = new ArrayList<>(); + + for (int i = 0; i < numberTaskManagers; i++) { + final Configuration effectiveConfiguration = new Configuration(configurationTemplate); + effectiveConfiguration.set( + TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, "taskManager_" + i); + + final TestProcessBuilder.TestProcess process = + startTaskManagerProcess(effectiveConfiguration); + + result.add(new TaskManagerProcess(effectiveConfiguration, process)); + } + + return result; + } + + private static TestProcessBuilder.TestProcess startTaskManagerProcess( + Configuration effectiveConfiguration) throws IOException { + final TestProcessBuilder taskManagerProcessBuilder = createTaskManagerProcessBuilder(); + taskManagerProcessBuilder.addConfigAsMainClassArgs(effectiveConfiguration); + + final TestProcessBuilder.TestProcess process = taskManagerProcessBuilder.start(); + return process; + } + + @Nonnull + private static TestProcessBuilder createTaskManagerProcessBuilder() throws IOException { + return new TestProcessBuilder(TaskExecutorProcessEntryPoint.class.getName()); + } + + private static class TaskManagerProcess { + private final Configuration configuration; + private final List<TestProcessBuilder.TestProcess> processHistory; + + private TaskManagerProcess( + Configuration configuration, TestProcessBuilder.TestProcess process) { + this.configuration = configuration; + this.processHistory = new ArrayList<>(); + processHistory.add(process); + } + + Iterable<TestProcessBuilder.TestProcess> getProcessHistory() { + return processHistory; + } + + void restartProcess(TestProcessBuilder builder) throws IOException, InterruptedException { + final TestProcessBuilder.TestProcess runningProcess = getRunningProcess(); + runningProcess.destroy(); + runningProcess.getProcess().waitFor(); + + builder.addConfigAsMainClassArgs(configuration); + final TestProcessBuilder.TestProcess restartedProcess = builder.start(); + + processHistory.add(restartedProcess); + } + + private TestProcessBuilder.TestProcess getRunningProcess() { + return processHistory.get(processHistory.size() - 1); + } + + public String getName() { + return configuration.get(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID); + } + + public void terminate() { + getRunningProcess().destroy(); + } + } + + private void waitUntilCheckpointCompleted( + Configuration configuration, int restPort, JobID jobId, Deadline deadline) + throws Exception { + final RestClient restClient = new RestClient(configuration, Executors.directExecutor()); + final JobMessageParameters messageParameters = new JobMessageParameters(); + messageParameters.jobPathParameter.resolve(jobId); + + CommonTestUtils.waitUntilCondition( + () -> { + final CheckpointingStatistics checkpointingStatistics = + restClient + .sendRequest( + "localhost", + restPort, + CheckpointingStatisticsHeaders.getInstance(), + messageParameters, + EmptyRequestBody.getInstance()) + .join(); + return checkpointingStatistics.getCounts().getNumberCompletedCheckpoints() > 0; + }, + deadline); + } + + private JobClient submitJob( + int parallelism, StandaloneSessionClusterEntrypoint clusterEntrypoint) + throws Exception { + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", clusterEntrypoint.getRestPort(), new Configuration()); + env.setParallelism(parallelism); + + env.enableCheckpointing(100, CheckpointingMode.EXACTLY_ONCE); + + env.addSource(new LocalRecoverySource()).keyBy(x -> x).addSink(new DiscardingSink<>()); + final JobClient jobClient = env.executeAsync(); + return jobClient; + } + + private static class LocalRecoverySource extends RichParallelSourceFunction<Integer> + implements CheckpointedFunction { + private volatile boolean running = true; + + private transient ListState<TaskNameAllocationID> previousAllocations; + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + while (running) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(1); + } + + Thread.sleep(5L); + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception {} + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) getRuntimeContext(); + String allocationId = runtimeContext.getAllocationIDAsString(); + // Pattern of the name: "Flat Map -> Sink: Unnamed (4/4)#0". Remove "#0" part: + String myName = runtimeContext.getTaskNameWithSubtasks().split("#")[0]; + + ListStateDescriptor<TaskNameAllocationID> previousAllocationsStateDescriptor = + new ListStateDescriptor<>("sourceState", TaskNameAllocationID.class); + previousAllocations = + context.getOperatorStateStore() + .getUnionListState(previousAllocationsStateDescriptor); + + if (context.isRestored()) { + final Iterable<TaskNameAllocationID> taskNameAllocationIds = + previousAllocations.get(); + + Optional<TaskNameAllocationID> optionalMyTaskNameAllocationId = Optional.empty(); + + for (TaskNameAllocationID taskNameAllocationId : taskNameAllocationIds) { + if (taskNameAllocationId.getName().equals(myName)) { + optionalMyTaskNameAllocationId = Optional.of(taskNameAllocationId); + break; + } + } + + final TaskNameAllocationID myTaskNameAllocationId = + optionalMyTaskNameAllocationId.orElseThrow( + () -> + new IllegalStateException( + "Could not find corresponding TaskNameAllocationID information.")); + + assertThat(myTaskNameAllocationId.getAllocationId()) + .withFailMessage( + "The task was deployed to AllocationID(%s) but it should have been deployed to AllocationID(%s) for local recovery.", + allocationId, myTaskNameAllocationId.getAllocationId()) + .isEqualTo(allocationId); + // terminate + running = false; + } + + previousAllocations.clear(); + previousAllocations.add(new TaskNameAllocationID(myName, allocationId)); + } + } + + private static class TaskNameAllocationID { + private final String name; + private final String allocationId; + + private TaskNameAllocationID(String name, String allocationId) { + this.name = name; + this.allocationId = allocationId; + } + + public String getName() { + return name; + } + + public String getAllocationId() { + return allocationId; + } + } +}