XComp commented on a change in pull request #14999:
URL: https://github.com/apache/flink/pull/14999#discussion_r582758886



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestUtils.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.Checkpoints;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Random;
+
+import static 
org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle;
+import static 
org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle;
+import static 
org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
+
+/** Class for common test utilities. */
+public class TestUtils {
+
+    public static File createSavepointWithOperatorState(
+            File savepointFile, long savepointId, OperatorID... operatorIds) 
throws IOException {
+        final Collection<OperatorState> operatorStates = 
createOperatorState(operatorIds);
+        final CheckpointMetadata savepoint =
+                new CheckpointMetadata(savepointId, operatorStates, 
Collections.emptyList());
+
+        try (FileOutputStream fileOutputStream = new 
FileOutputStream(savepointFile)) {
+            Checkpoints.storeCheckpointMetadata(savepoint, fileOutputStream);
+        }
+
+        return savepointFile;
+    }
+
+    private static Collection<OperatorState> createOperatorState(OperatorID... 
operatorIds) {
+        Random random = new Random();
+        Collection<OperatorState> operatorStates = new 
ArrayList<>(operatorIds.length);
+
+        for (OperatorID operatorId : operatorIds) {
+            final OperatorState operatorState = new OperatorState(operatorId, 
1, 42);
+            final OperatorSubtaskState subtaskState =
+                    OperatorSubtaskState.builder()
+                            .setManagedOperatorState(
+                                    new OperatorStreamStateHandle(
+                                            Collections.emptyMap(),
+                                            new 
ByteStreamStateHandle("foobar", new byte[0])))
+                            .setInputChannelState(
+                                    
singleton(createNewInputChannelStateHandle(10, random)))
+                            .setResultSubpartitionState(
+                                    
singleton(createNewResultSubpartitionStateHandle(10, random)))
+                            .build();
+            operatorState.putState(0, subtaskState);
+            operatorStates.add(operatorState);
+        }
+
+        return operatorStates;
+    }
+
+    @Nonnull
+    public static JobGraph createJobGraphFromJobVerticesWithCheckpointing(
+            SavepointRestoreSettings savepointRestoreSettings, JobVertex... 
jobVertices) {
+        final JobGraph jobGraph = new JobGraph(jobVertices);
+
+        // enable checkpointing which is required to resume from a savepoint
+        final CheckpointCoordinatorConfiguration 
checkpoinCoordinatorConfiguration =
+                new CheckpointCoordinatorConfiguration(

Review comment:
       We might want to switch to `CheckpointCoordinatorConfiguration.Builder` 
here as the constructor is deprecated. This would also make the code easier to 
read.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -571,6 +566,115 @@ public void 
testRequestPartitionStateFailsInIllegalState() throws Exception {
         scheduler.requestPartitionState(new IntermediateDataSetID(), new 
ResultPartitionID());
     }
 
+    @Test
+    public void testRestoringModifiedJobFromSavepointFails() throws Exception {
+        // create savepoint data
+        final long savepointId = 42L;
+        final OperatorID operatorID = new OperatorID();
+        final File savepointFile =
+                TestUtils.createSavepointWithOperatorState(
+                        TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
+
+        // set savepoint settings which don't allow non restored state
+        final SavepointRestoreSettings savepointRestoreSettings =
+                
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), false);
+
+        // create a new operator
+        final JobVertex jobVertex = new JobVertex("New operator");
+        jobVertex.setInvokableClass(NoOpInvokable.class);
+        final JobGraph jobGraphWithNewOperator =
+                TestUtils.createJobGraphFromJobVerticesWithCheckpointing(
+                        savepointRestoreSettings, jobVertex);
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraphWithNewOperator.getJobID());
+
+        final AdaptiveScheduler adaptiveScheduler =
+                new AdaptiveSchedulerBuilder(
+                                jobGraphWithNewOperator,
+                                
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        adaptiveScheduler.startScheduling();
+
+        offerSlots(

Review comment:
       I'm wondering whether we should move these utility methods into its own 
utility class since we're using them not only in 
`DefaultDeclarativeSlotPoolTest` but also in `AdaptiveSchedulerTest`. But that 
might be out of scope for this ticket.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -571,6 +566,115 @@ public void 
testRequestPartitionStateFailsInIllegalState() throws Exception {
         scheduler.requestPartitionState(new IntermediateDataSetID(), new 
ResultPartitionID());
     }
 
+    @Test
+    public void testRestoringModifiedJobFromSavepointFails() throws Exception {
+        // create savepoint data
+        final long savepointId = 42L;
+        final OperatorID operatorID = new OperatorID();
+        final File savepointFile =
+                TestUtils.createSavepointWithOperatorState(
+                        TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
+
+        // set savepoint settings which don't allow non restored state
+        final SavepointRestoreSettings savepointRestoreSettings =
+                
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), false);
+
+        // create a new operator
+        final JobVertex jobVertex = new JobVertex("New operator");
+        jobVertex.setInvokableClass(NoOpInvokable.class);
+        final JobGraph jobGraphWithNewOperator =
+                TestUtils.createJobGraphFromJobVerticesWithCheckpointing(
+                        savepointRestoreSettings, jobVertex);
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraphWithNewOperator.getJobID());
+
+        final AdaptiveScheduler adaptiveScheduler =
+                new AdaptiveSchedulerBuilder(
+                                jobGraphWithNewOperator,
+                                
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        adaptiveScheduler.startScheduling();
+
+        offerSlots(
+                declarativeSlotPool,
+                createSlotOffersForResourceRequirements(
+                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
1)));
+
+        final ArchivedExecutionGraph archivedExecutionGraph = 
adaptiveScheduler.requestJob();
+
+        assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED));
+        assertThat(
+                archivedExecutionGraph.getFailureInfo().getException(),
+                FlinkMatchers.containsMessage("Failed to rollback to 
checkpoint/savepoint"));
+    }
+
+    @Nonnull
+    private DefaultDeclarativeSlotPool createDeclarativeSlotPool(JobID jobId) {

Review comment:
       This could be made a static utility method and moved down into the Utils 
section of this class.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -968,6 +982,75 @@ public void testExceptionHistoryWithRestartableFailure() {
                 
lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
     }
 
+    @Test(expected = IllegalStateException.class)
+    public void testRestoringModifiedJobFromSavepointFails() throws Exception {
+        // create savepoint data
+        final long savepointId = 42L;
+        final OperatorID operatorID = new OperatorID();
+        final File savepointFile =
+                TestUtils.createSavepointWithOperatorState(
+                        TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
+
+        // set savepoint settings which don't allow non restored state
+        final SavepointRestoreSettings savepointRestoreSettings =
+                
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), false);
+
+        // create a new operator

Review comment:
       ```suggestion
           // create a new operator
           // this test will fail in the end due to the previously created 
Savepoint having a state for a given OperatorID that does not match any 
operator of the newly created JobGraph
   ```
   (please correct me if I'm wrong with the suggested comment above)
   I would suggest adding some comment about the rationals of the test here to 
make a reader understand why we expect the test to fail.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -968,6 +982,75 @@ public void testExceptionHistoryWithRestartableFailure() {
                 
lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
     }
 
+    @Test(expected = IllegalStateException.class)
+    public void testRestoringModifiedJobFromSavepointFails() throws Exception {
+        // create savepoint data
+        final long savepointId = 42L;
+        final OperatorID operatorID = new OperatorID();
+        final File savepointFile =
+                TestUtils.createSavepointWithOperatorState(
+                        TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
+
+        // set savepoint settings which don't allow non restored state
+        final SavepointRestoreSettings savepointRestoreSettings =
+                
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), false);
+
+        // create a new operator

Review comment:
       This would apply to the AdaptiveSchedulerTest accordingly.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -968,6 +982,75 @@ public void testExceptionHistoryWithRestartableFailure() {
                 
lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
     }
 
+    @Test(expected = IllegalStateException.class)

Review comment:
       Shouldn't we do a check for the error message similarly to what is done 
in `AdaptiveSchedulerTest` here as well as `IllegalStateException` might be 
quite generic.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -968,6 +982,75 @@ public void testExceptionHistoryWithRestartableFailure() {
                 
lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
     }
 
+    @Test(expected = IllegalStateException.class)
+    public void testRestoringModifiedJobFromSavepointFails() throws Exception {
+        // create savepoint data
+        final long savepointId = 42L;
+        final OperatorID operatorID = new OperatorID();
+        final File savepointFile =
+                TestUtils.createSavepointWithOperatorState(
+                        TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
+
+        // set savepoint settings which don't allow non restored state
+        final SavepointRestoreSettings savepointRestoreSettings =
+                
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), false);
+
+        // create a new operator
+        final JobVertex jobVertex = new JobVertex("New operator");
+        jobVertex.setInvokableClass(NoOpInvokable.class);
+        final JobGraph jobGraphWithNewOperator =
+                TestUtils.createJobGraphFromJobVerticesWithCheckpointing(
+                        savepointRestoreSettings, jobVertex);
+
+        // creating the DefaultScheduler should try to restore the 
ExecutionGraph
+        SchedulerTestingUtils.newSchedulerBuilder(
+                        jobGraphWithNewOperator,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+                .build();
+    }
+
+    @Test
+    public void 
testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds()
+            throws Exception {
+        // create savepoint data
+        final long savepointId = 42L;
+        final OperatorID operatorID = new OperatorID();
+        final File savepointFile =
+                TestUtils.createSavepointWithOperatorState(
+                        TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
+
+        // allow for non restored state
+        final SavepointRestoreSettings savepointRestoreSettings =
+                
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), true);
+
+        // create a new operator
+        final JobVertex jobVertex = new JobVertex("New operator");
+        jobVertex.setInvokableClass(NoOpInvokable.class);
+        final JobGraph jobGraphWithNewOperator =
+                TestUtils.createJobGraphFromJobVerticesWithCheckpointing(
+                        savepointRestoreSettings, jobVertex);
+
+        final StandaloneCompletedCheckpointStore completedCheckpointStore =
+                new StandaloneCompletedCheckpointStore(1);
+        final CheckpointRecoveryFactory testingCheckpointRecoveryFactory =
+                useSameServicesForAllJobs(
+                        completedCheckpointStore, new 
StandaloneCheckpointIDCounter());
+
+        SchedulerTestingUtils.newSchedulerBuilder(
+                        jobGraphWithNewOperator,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+                .setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory)
+                .build();
+
+        // creating the DefaultScheduler should have read the savepoint
+        final CompletedCheckpoint savepointCheckpoint =
+                completedCheckpointStore.getLatestCheckpoint(false);
+
+        MatcherAssert.assertThat(savepointCheckpoint, notNullValue());
+
+        MatcherAssert.assertThat(savepointCheckpoint.getCheckpointID(), 
is(savepointId));

Review comment:
       This applies to the `AdaptiveSchedulerTest` accordingly.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -968,6 +982,75 @@ public void testExceptionHistoryWithRestartableFailure() {
                 
lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
     }
 
+    @Test(expected = IllegalStateException.class)
+    public void testRestoringModifiedJobFromSavepointFails() throws Exception {
+        // create savepoint data
+        final long savepointId = 42L;
+        final OperatorID operatorID = new OperatorID();
+        final File savepointFile =
+                TestUtils.createSavepointWithOperatorState(
+                        TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
+
+        // set savepoint settings which don't allow non restored state
+        final SavepointRestoreSettings savepointRestoreSettings =
+                
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), false);
+
+        // create a new operator
+        final JobVertex jobVertex = new JobVertex("New operator");
+        jobVertex.setInvokableClass(NoOpInvokable.class);
+        final JobGraph jobGraphWithNewOperator =
+                TestUtils.createJobGraphFromJobVerticesWithCheckpointing(
+                        savepointRestoreSettings, jobVertex);
+
+        // creating the DefaultScheduler should try to restore the 
ExecutionGraph
+        SchedulerTestingUtils.newSchedulerBuilder(
+                        jobGraphWithNewOperator,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+                .build();
+    }
+
+    @Test
+    public void 
testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds()
+            throws Exception {
+        // create savepoint data
+        final long savepointId = 42L;
+        final OperatorID operatorID = new OperatorID();
+        final File savepointFile =
+                TestUtils.createSavepointWithOperatorState(
+                        TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
+
+        // allow for non restored state
+        final SavepointRestoreSettings savepointRestoreSettings =
+                
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), true);
+
+        // create a new operator
+        final JobVertex jobVertex = new JobVertex("New operator");
+        jobVertex.setInvokableClass(NoOpInvokable.class);
+        final JobGraph jobGraphWithNewOperator =
+                TestUtils.createJobGraphFromJobVerticesWithCheckpointing(
+                        savepointRestoreSettings, jobVertex);
+
+        final StandaloneCompletedCheckpointStore completedCheckpointStore =
+                new StandaloneCompletedCheckpointStore(1);
+        final CheckpointRecoveryFactory testingCheckpointRecoveryFactory =
+                useSameServicesForAllJobs(
+                        completedCheckpointStore, new 
StandaloneCheckpointIDCounter());
+
+        SchedulerTestingUtils.newSchedulerBuilder(
+                        jobGraphWithNewOperator,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+                .setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory)
+                .build();
+
+        // creating the DefaultScheduler should have read the savepoint
+        final CompletedCheckpoint savepointCheckpoint =
+                completedCheckpointStore.getLatestCheckpoint(false);
+
+        MatcherAssert.assertThat(savepointCheckpoint, notNullValue());
+
+        MatcherAssert.assertThat(savepointCheckpoint.getCheckpointID(), 
is(savepointId));

Review comment:
       ```suggestion
           final CompletedCheckpoint savepoint =
                   completedCheckpointStore.getLatestCheckpoint(false);
   
           MatcherAssert.assertThat(savepoint, notNullValue());
   
           MatcherAssert.assertThat(savepoint.getCheckpointID(), 
is(savepointId));
   ```
   I would remove the "Checkpoint" part here. At least to me, it was a bit 
confusing: Is it a checkpoint or a savepoint?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to