zhuzhurk commented on code in PR #24771:
URL: https://github.com/apache/flink/pull/24771#discussion_r1612975806


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java:
##########
@@ -0,0 +1,1217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
+import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import 
org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import 
org.apache.flink.runtime.io.network.partition.JobMasterPartitionTrackerImpl;
+import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.event.ExecutionVertexFinishedEvent;
+import org.apache.flink.runtime.jobmaster.event.FileSystemJobEventStore;
+import org.apache.flink.runtime.jobmaster.event.JobEvent;
+import org.apache.flink.runtime.jobmaster.event.JobEventManager;
+import org.apache.flink.runtime.jobmaster.event.JobEventStore;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
+import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
+import 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import 
org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.shuffle.DefaultShuffleMetrics;
+import org.apache.flink.runtime.shuffle.JobShuffleContextImpl;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMetrics;
+import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
+import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionVertexState;
+import static 
org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder.createCustomParallelismDecider;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for batch job recovery. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class BatchJobRecoveryTest {
+
+    private final Duration previousWorkerRecoveryTimeout = 
Duration.ofSeconds(1);
+
+    @TempDir private java.nio.file.Path temporaryFolder;
+
+    // ---- Mocks for the underlying Operator Coordinator Context ---
+    protected EventReceivingTasks receivingTasks;
+
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    @RegisterExtension
+    static final TestingComponentMainThreadExecutor.Extension 
MAIN_EXECUTOR_RESOURCE =
+            new TestingComponentMainThreadExecutor.Extension();
+
+    private final TestingComponentMainThreadExecutor mainThreadExecutor =
+            MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+    private ScheduledExecutor delayedExecutor =
+            new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
+
+    private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
+    private static final int NUM_SPLITS = 10;
+    private static final int SOURCE_PARALLELISM = 5;
+    private static final int MIDDLE_PARALLELISM = 5;
+    private static final int DECIDED_SINK_PARALLELISM = 2;
+    private static final JobVertexID SOURCE_ID = new JobVertexID();
+    private static final JobVertexID MIDDLE_ID = new JobVertexID();
+    private static final JobVertexID SINK_ID = new JobVertexID();
+    private static final JobID JOB_ID = new JobID();
+
+    private SourceCoordinatorProvider<MockSourceSplit> provider;
+    private FileSystemJobEventStore jobEventStore;
+    private List<JobEvent> persistedJobEventList;
+
+    private byte[] serializedJobGraph;
+
+    private final Collection<PartitionWithMetrics> allPartitionWithMetrics = 
new ArrayList<>();
+
+    @Parameter public boolean enableSpeculativeExecution;
+
+    @Parameters(name = "enableSpeculativeExecution={0}")
+    public static Collection<Boolean> parameters() {
+        return Arrays.asList(false, true);
+    }
+
+    @BeforeEach
+    void setUp() throws IOException {
+        final Path rootPath = new 
Path(TempDirUtils.newFolder(temporaryFolder).getAbsolutePath());
+        delayedExecutor = new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
+        receivingTasks = EventReceivingTasks.createForRunningTasks();
+        persistedJobEventList = new ArrayList<>();
+        jobEventStore =
+                new TestingFileSystemJobEventStore(
+                        rootPath, new Configuration(), persistedJobEventList);
+
+        provider =
+                new SourceCoordinatorProvider<>(
+                        "AdaptiveBatchSchedulerTest",
+                        OPERATOR_ID,
+                        new MockSource(Boundedness.BOUNDED, NUM_SPLITS),
+                        1,
+                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                        null);
+
+        this.serializedJobGraph = serializeJobGraph(createDefaultJobGraph());
+        allPartitionWithMetrics.clear();
+    }
+
+    @AfterEach
+    void after() {
+        jobEventStore.stop(true);
+    }
+
+    @TestTemplate
+    void testOpCoordUnsupportedBatchSnapshotWithJobVertexUnFinished() throws 
Exception {

Review Comment:
   Could you add some comments for each case? including:
   1. old execution status
   2. what unexpected happen during recovery, e.g. expected partitions are not 
fetched, log reply failed, etc
   3. expected recovered status



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java:
##########
@@ -0,0 +1,1217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
+import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import 
org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import 
org.apache.flink.runtime.io.network.partition.JobMasterPartitionTrackerImpl;
+import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.event.ExecutionVertexFinishedEvent;
+import org.apache.flink.runtime.jobmaster.event.FileSystemJobEventStore;
+import org.apache.flink.runtime.jobmaster.event.JobEvent;
+import org.apache.flink.runtime.jobmaster.event.JobEventManager;
+import org.apache.flink.runtime.jobmaster.event.JobEventStore;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
+import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
+import 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import 
org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.shuffle.DefaultShuffleMetrics;
+import org.apache.flink.runtime.shuffle.JobShuffleContextImpl;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMetrics;
+import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
+import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionVertexState;
+import static 
org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder.createCustomParallelismDecider;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for batch job recovery. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class BatchJobRecoveryTest {
+
+    private final Duration previousWorkerRecoveryTimeout = 
Duration.ofSeconds(1);
+
+    @TempDir private java.nio.file.Path temporaryFolder;
+
+    // ---- Mocks for the underlying Operator Coordinator Context ---
+    protected EventReceivingTasks receivingTasks;
+
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    @RegisterExtension
+    static final TestingComponentMainThreadExecutor.Extension 
MAIN_EXECUTOR_RESOURCE =
+            new TestingComponentMainThreadExecutor.Extension();
+
+    private final TestingComponentMainThreadExecutor mainThreadExecutor =
+            MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+    private ScheduledExecutor delayedExecutor =
+            new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
+
+    private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
+    private static final int NUM_SPLITS = 10;
+    private static final int SOURCE_PARALLELISM = 5;
+    private static final int MIDDLE_PARALLELISM = 5;
+    private static final int DECIDED_SINK_PARALLELISM = 2;
+    private static final JobVertexID SOURCE_ID = new JobVertexID();
+    private static final JobVertexID MIDDLE_ID = new JobVertexID();
+    private static final JobVertexID SINK_ID = new JobVertexID();
+    private static final JobID JOB_ID = new JobID();
+
+    private SourceCoordinatorProvider<MockSourceSplit> provider;
+    private FileSystemJobEventStore jobEventStore;
+    private List<JobEvent> persistedJobEventList;
+
+    private byte[] serializedJobGraph;
+
+    private final Collection<PartitionWithMetrics> allPartitionWithMetrics = 
new ArrayList<>();
+
+    @Parameter public boolean enableSpeculativeExecution;
+
+    @Parameters(name = "enableSpeculativeExecution={0}")
+    public static Collection<Boolean> parameters() {
+        return Arrays.asList(false, true);
+    }
+
+    @BeforeEach
+    void setUp() throws IOException {
+        final Path rootPath = new 
Path(TempDirUtils.newFolder(temporaryFolder).getAbsolutePath());
+        delayedExecutor = new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
+        receivingTasks = EventReceivingTasks.createForRunningTasks();
+        persistedJobEventList = new ArrayList<>();
+        jobEventStore =
+                new TestingFileSystemJobEventStore(
+                        rootPath, new Configuration(), persistedJobEventList);
+
+        provider =
+                new SourceCoordinatorProvider<>(
+                        "AdaptiveBatchSchedulerTest",
+                        OPERATOR_ID,
+                        new MockSource(Boundedness.BOUNDED, NUM_SPLITS),
+                        1,
+                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                        null);
+
+        this.serializedJobGraph = serializeJobGraph(createDefaultJobGraph());
+        allPartitionWithMetrics.clear();
+    }
+
+    @AfterEach
+    void after() {
+        jobEventStore.stop(true);
+    }
+
+    @TestTemplate
+    void testOpCoordUnsupportedBatchSnapshotWithJobVertexUnFinished() throws 
Exception {

Review Comment:
   Could you add some comments for each case? including:
   1. old execution status
   2. what unexpected happen during recovery, e.g. expected partitions are not 
fetched, log reply failed, etc
   3. expected recovered status



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java:
##########
@@ -0,0 +1,1217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
+import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import 
org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import 
org.apache.flink.runtime.io.network.partition.JobMasterPartitionTrackerImpl;
+import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.event.ExecutionVertexFinishedEvent;
+import org.apache.flink.runtime.jobmaster.event.FileSystemJobEventStore;
+import org.apache.flink.runtime.jobmaster.event.JobEvent;
+import org.apache.flink.runtime.jobmaster.event.JobEventManager;
+import org.apache.flink.runtime.jobmaster.event.JobEventStore;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
+import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
+import 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import 
org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.shuffle.DefaultShuffleMetrics;
+import org.apache.flink.runtime.shuffle.JobShuffleContextImpl;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMetrics;
+import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
+import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionVertexState;
+import static 
org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder.createCustomParallelismDecider;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for batch job recovery. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class BatchJobRecoveryTest {
+
+    private final Duration previousWorkerRecoveryTimeout = 
Duration.ofSeconds(1);
+
+    @TempDir private java.nio.file.Path temporaryFolder;
+
+    // ---- Mocks for the underlying Operator Coordinator Context ---
+    protected EventReceivingTasks receivingTasks;
+
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    @RegisterExtension
+    static final TestingComponentMainThreadExecutor.Extension 
MAIN_EXECUTOR_RESOURCE =
+            new TestingComponentMainThreadExecutor.Extension();
+
+    private final TestingComponentMainThreadExecutor mainThreadExecutor =
+            MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
+
+    private ScheduledExecutor delayedExecutor =
+            new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
+
+    private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
+    private static final int NUM_SPLITS = 10;
+    private static final int SOURCE_PARALLELISM = 5;
+    private static final int MIDDLE_PARALLELISM = 5;
+    private static final int DECIDED_SINK_PARALLELISM = 2;
+    private static final JobVertexID SOURCE_ID = new JobVertexID();
+    private static final JobVertexID MIDDLE_ID = new JobVertexID();
+    private static final JobVertexID SINK_ID = new JobVertexID();
+    private static final JobID JOB_ID = new JobID();
+
+    private SourceCoordinatorProvider<MockSourceSplit> provider;
+    private FileSystemJobEventStore jobEventStore;
+    private List<JobEvent> persistedJobEventList;
+
+    private byte[] serializedJobGraph;
+
+    private final Collection<PartitionWithMetrics> allPartitionWithMetrics = 
new ArrayList<>();
+
+    @Parameter public boolean enableSpeculativeExecution;
+
+    @Parameters(name = "enableSpeculativeExecution={0}")
+    public static Collection<Boolean> parameters() {
+        return Arrays.asList(false, true);
+    }
+
+    @BeforeEach
+    void setUp() throws IOException {
+        final Path rootPath = new 
Path(TempDirUtils.newFolder(temporaryFolder).getAbsolutePath());
+        delayedExecutor = new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
+        receivingTasks = EventReceivingTasks.createForRunningTasks();
+        persistedJobEventList = new ArrayList<>();
+        jobEventStore =
+                new TestingFileSystemJobEventStore(
+                        rootPath, new Configuration(), persistedJobEventList);
+
+        provider =
+                new SourceCoordinatorProvider<>(
+                        "AdaptiveBatchSchedulerTest",
+                        OPERATOR_ID,
+                        new MockSource(Boundedness.BOUNDED, NUM_SPLITS),
+                        1,
+                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                        null);
+
+        this.serializedJobGraph = serializeJobGraph(createDefaultJobGraph());
+        allPartitionWithMetrics.clear();
+    }
+
+    @AfterEach
+    void after() {
+        jobEventStore.stop(true);
+    }
+
+    @TestTemplate
+    void testOpCoordUnsupportedBatchSnapshotWithJobVertexUnFinished() throws 
Exception {
+        JobGraph jobGraph = deserializeJobGraph(serializedJobGraph);
+        JobVertex jobVertex = jobGraph.findVertexByID(MIDDLE_ID);
+        jobVertex.addOperatorCoordinator(
+                new SerializedValue<>(
+                        new TestingOperatorCoordinator.Provider(
+                                
jobVertex.getOperatorIDs().get(0).getGeneratedOperatorID())));
+        AdaptiveBatchScheduler scheduler =
+                createScheduler(
+                        jobGraph,
+                        Duration.ZERO /* make sure every finished event can 
flush on time.*/);
+
+        // start scheduling.
+        runInMainThread(scheduler::startScheduling);
+
+        runInMainThread(
+                () -> {
+                    // trigger all source finished
+                    transitionExecutionsState(scheduler, 
ExecutionState.FINISHED, SOURCE_ID);
+                });
+        runInMainThread(
+                () -> {
+                    // trigger first middle finished.
+                    ExecutionVertex firstMiddle =
+                            getExecutionVertex(MIDDLE_ID, 0, 
scheduler.getExecutionGraph());
+                    AdaptiveBatchSchedulerTest.transitionExecutionsState(
+                            scheduler,
+                            ExecutionState.FINISHED,
+                            
Collections.singletonList(firstMiddle.getCurrentExecutionAttempt()),
+                            null);
+                });
+        List<ExecutionAttemptID> sourceExecutions =
+                
getCurrentAttemptIds(scheduler.getExecutionJobVertex(SOURCE_ID));
+        List<ExecutionAttemptID> middleExecutions =
+                
getCurrentAttemptIds(scheduler.getExecutionJobVertex(MIDDLE_ID));
+
+        waitWriteJobFinishedEventCompleted(6);
+        runInMainThread(
+                () -> {
+                    // flush events.
+                    jobEventStore.stop(false);
+                });
+
+        // register partitions, the partition of source task 0 is lost, and it 
will be restarted
+        // if middle task 0 need be restarted.
+        int subtaskIndex = 0;
+        registerPartitions(
+                scheduler,
+                Collections.emptySet(),
+                Collections.singleton(
+                        scheduler
+                                .getExecutionJobVertex(SOURCE_ID)
+                                .getTaskVertices()[subtaskIndex]
+                                .getID()));
+
+        // start a new scheduler and try to recover.
+        AdaptiveBatchScheduler newScheduler = createScheduler(jobGraph);
+        startSchedulingAndWaitRecoverFinish(newScheduler);
+
+        // check source vertices state were recovered.
+        for (ExecutionVertex vertex :
+                getExecutionVertices(SOURCE_ID, 
newScheduler.getExecutionGraph())) {
+            if (vertex.getParallelSubtaskIndex() == subtaskIndex) {
+                
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.DEPLOYING);
+                continue;
+            }
+
+            // check state.
+            assertThat(sourceExecutions)
+                    
.contains(vertex.getCurrentExecutionAttempt().getAttemptId());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.FINISHED);
+
+            // check partition tracker was rebuild.
+            JobMasterPartitionTracker partitionTracker =
+                    ((InternalExecutionGraphAccessor) 
newScheduler.getExecutionGraph())
+                            .getPartitionTracker();
+            List<ResultPartitionID> resultPartitionIds =
+                    vertex.getProducedPartitions().keySet().stream()
+                            .map(
+                                    ((DefaultExecutionGraph) 
newScheduler.getExecutionGraph())
+                                            ::createResultPartitionId)
+                            .collect(Collectors.toList());
+            for (ResultPartitionID partitionID : resultPartitionIds) {
+                
assertThat(partitionTracker.isPartitionTracked(partitionID)).isTrue();
+            }
+        }
+
+        // check middle vertices state were not recovered.
+        for (ExecutionVertex vertex :
+                getExecutionVertices(MIDDLE_ID, 
newScheduler.getExecutionGraph())) {
+            if (vertex.getParallelSubtaskIndex() == subtaskIndex) {
+                
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.CREATED);
+                continue;
+            }
+
+            assertThat(middleExecutions)
+                    
.doesNotContain(vertex.getCurrentExecutionAttempt().getAttemptId());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.DEPLOYING);
+        }
+    }
+
+    @TestTemplate
+    void testOpCoordUnsupportedBatchSnapshotWithJobVertexFinished() throws 
Exception {
+        JobGraph jobGraph = deserializeJobGraph(serializedJobGraph);
+        JobVertex jobVertex = jobGraph.findVertexByID(MIDDLE_ID);
+        jobVertex.addOperatorCoordinator(
+                new SerializedValue<>(
+                        new TestingOperatorCoordinator.Provider(
+                                
jobVertex.getOperatorIDs().get(0).getGeneratedOperatorID())));
+        AdaptiveBatchScheduler scheduler = createScheduler(jobGraph);
+
+        // start scheduling.
+        runInMainThread(scheduler::startScheduling);
+
+        runInMainThread(
+                () -> {
+                    // trigger all source finished
+                    transitionExecutionsState(scheduler, 
ExecutionState.FINISHED, SOURCE_ID);
+                });
+        runInMainThread(
+                () -> {
+                    // trigger all middle finished.
+                    transitionExecutionsState(scheduler, 
ExecutionState.FINISHED, MIDDLE_ID);
+                });
+        List<ExecutionAttemptID> sourceExecutions =
+                
getCurrentAttemptIds(scheduler.getExecutionJobVertex(SOURCE_ID));
+        List<ExecutionAttemptID> middleExecutions =
+                
getCurrentAttemptIds(scheduler.getExecutionJobVertex(MIDDLE_ID));
+
+        waitWriteJobFinishedEventCompleted(10);
+        runInMainThread(
+                () -> {
+                    // flush events.
+                    jobEventStore.stop(false);
+                });
+
+        // register partitions
+        registerPartitions(scheduler);
+
+        // start a new scheduler and try to recover.
+        AdaptiveBatchScheduler newScheduler = createScheduler(jobGraph);
+        startSchedulingAndWaitRecoverFinish(newScheduler);
+
+        // check source vertices state were recovered.
+        for (ExecutionVertex vertex :
+                getExecutionVertices(SOURCE_ID, 
newScheduler.getExecutionGraph())) {
+            // check state.
+            assertThat(sourceExecutions)
+                    
.contains(vertex.getCurrentExecutionAttempt().getAttemptId());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.FINISHED);
+        }
+
+        // check middle vertices state were recovered.
+        for (ExecutionVertex vertex :
+                getExecutionVertices(MIDDLE_ID, 
newScheduler.getExecutionGraph())) {
+            // check state.
+            assertThat(middleExecutions)
+                    
.contains(vertex.getCurrentExecutionAttempt().getAttemptId());
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.FINISHED);
+
+            // check partition tracker was rebuild.
+            JobMasterPartitionTracker partitionTracker =
+                    ((InternalExecutionGraphAccessor) 
newScheduler.getExecutionGraph())
+                            .getPartitionTracker();
+            List<ResultPartitionID> resultPartitionIds =
+                    vertex.getProducedPartitions().keySet().stream()
+                            .map(
+                                    ((DefaultExecutionGraph) 
newScheduler.getExecutionGraph())
+                                            ::createResultPartitionId)
+                            .collect(Collectors.toList());
+            for (ResultPartitionID partitionID : resultPartitionIds) {
+                
assertThat(partitionTracker.isPartitionTracked(partitionID)).isTrue();
+            }
+        }
+    }
+
+    @TestTemplate
+    void 
testOpCoordUnsupportedBatchSnapshotWithJobVertexFinishedAndPartitionNotFoundTwice()
+            throws Exception {
+        JobGraph jobGraph = deserializeJobGraph(serializedJobGraph);
+        JobVertex jobVertex = jobGraph.findVertexByID(SOURCE_ID);
+        jobVertex.addOperatorCoordinator(
+                new SerializedValue<>(
+                        new TestingOperatorCoordinator.Provider(
+                                
jobVertex.getOperatorIDs().get(0).getGeneratedOperatorID())));
+        AdaptiveBatchScheduler scheduler = createScheduler(jobGraph);
+
+        // start scheduling.
+        runInMainThread(scheduler::startScheduling);
+
+        runInMainThread(
+                () -> {
+                    // trigger all source finished
+                    transitionExecutionsState(scheduler, 
ExecutionState.FINISHED, SOURCE_ID);
+                });
+
+        waitWriteJobFinishedEventCompleted(5);
+        runInMainThread(
+                () -> {
+                    // flush events.
+                    jobEventStore.stop(false);
+                });
+
+        // register partitions
+        registerPartitions(scheduler);
+
+        // start a new scheduler and try to recover.
+        AdaptiveBatchScheduler newScheduler = createScheduler(jobGraph);
+        startSchedulingAndWaitRecoverFinish(newScheduler);
+
+        // check source vertices state were recovered.
+        for (ExecutionVertex vertex :
+                getExecutionVertices(SOURCE_ID, 
newScheduler.getExecutionGraph())) {
+            // check state.
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.FINISHED);
+        }
+
+        // the first time trigger partition not found
+        ExecutionVertex firstMiddle0 =
+                getExecutionVertex(MIDDLE_ID, 0, 
newScheduler.getExecutionGraph());
+        triggerFailedByDataConsumptionException(newScheduler, firstMiddle0);
+
+        waitUntilExecutionVertexState(firstMiddle0, ExecutionState.CREATED, 
5000L);
+
+        // verify all source executionVertices were restarted
+        for (int i = 0; i < 5; i++) {
+            assertThat(
+                            getExecutionVertex(SOURCE_ID, i, 
newScheduler.getExecutionGraph())
+                                    .getExecutionState())
+                    .isNotEqualTo(ExecutionState.FINISHED);
+        }
+
+        // trigger all source finished.
+        runInMainThread(
+                () -> transitionExecutionsState(newScheduler, 
ExecutionState.FINISHED, SOURCE_ID));
+
+        // the second time trigger partition not found
+        firstMiddle0 = getExecutionVertex(MIDDLE_ID, 0, 
newScheduler.getExecutionGraph());
+        triggerFailedByDataConsumptionException(newScheduler, firstMiddle0);
+
+        waitUntilExecutionVertexState(firstMiddle0, ExecutionState.CREATED, 
5000L);
+
+        // verify only source task 0 were restarted
+        assertThat(
+                        getExecutionVertex(SOURCE_ID, 0, 
newScheduler.getExecutionGraph())
+                                .getExecutionState())
+                .isNotEqualTo(ExecutionState.FINISHED);
+
+        for (int i = 1; i < 5; i++) {
+            assertThat(
+                            getExecutionVertex(SOURCE_ID, i, 
newScheduler.getExecutionGraph())
+                                    .getExecutionState())
+                    .isEqualTo(ExecutionState.FINISHED);
+        }
+    }
+
+    @TestTemplate
+    void testReplayEventFailed() throws Exception {
+        final JobEventStore failingJobEventStore =
+                new JobEventStore() {
+                    @Override
+                    public void start() {}
+
+                    @Override
+                    public void stop(boolean clear) {}
+
+                    @Override
+                    public void writeEvent(JobEvent event, boolean cutBlock) {}
+
+                    @Override
+                    public JobEvent readEvent() throws Exception {
+                        throw new Exception();
+                    }
+
+                    @Override
+                    public boolean isEmpty() {
+                        return false;
+                    }
+                };
+
+        final ManuallyTriggeredScheduledExecutor taskRestartExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+        delayedExecutor = taskRestartExecutor;
+
+        final AdaptiveBatchScheduler newScheduler =
+                createScheduler(deserializeJobGraph(serializedJobGraph), 
failingJobEventStore);
+        startSchedulingAndWaitRecoverFinish(newScheduler);
+
+        // trigger scheduled restarting and drain the main thread actions
+        taskRestartExecutor.triggerScheduledTasks();
+        runInMainThread(() -> {});
+
+        assertThat(
+                        ExceptionUtils.findThrowableWithMessage(
+                                
newScheduler.getExecutionGraph().getFailureCause(),
+                                "Recover failed from JM failover"))
+                .isPresent();
+
+        // source should be scheduled.
+        for (ExecutionVertex vertex :
+                getExecutionVertices(SOURCE_ID, 
newScheduler.getExecutionGraph())) {
+            
assertThat(vertex.getCurrentExecutionAttempt().getAttemptNumber()).isEqualTo(1);
+            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.DEPLOYING);
+        }
+    }
+
+    @TestTemplate
+    void testRecoverFromJMFailover() throws Exception {

Review Comment:
   I think this is the most common case and should be the first case in this 
test set. Other cases just cover some special cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to