tillrohrmann commented on a change in pull request #14999:
URL: https://github.com/apache/flink/pull/14999#discussion_r582981871
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -571,6 +566,115 @@ public void
testRequestPartitionStateFailsInIllegalState() throws Exception {
scheduler.requestPartitionState(new IntermediateDataSetID(), new
ResultPartitionID());
}
+ @Test
+ public void testRestoringModifiedJobFromSavepointFails() throws Exception {
+ // create savepoint data
+ final long savepointId = 42L;
+ final OperatorID operatorID = new OperatorID();
+ final File savepointFile =
+ TestUtils.createSavepointWithOperatorState(
+ TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
+
+ // set savepoint settings which don't allow non restored state
+ final SavepointRestoreSettings savepointRestoreSettings =
+
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), false);
+
+ // create a new operator
+ final JobVertex jobVertex = new JobVertex("New operator");
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+ final JobGraph jobGraphWithNewOperator =
+ TestUtils.createJobGraphFromJobVerticesWithCheckpointing(
+ savepointRestoreSettings, jobVertex);
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraphWithNewOperator.getJobID());
+
+ final AdaptiveScheduler adaptiveScheduler =
+ new AdaptiveSchedulerBuilder(
+ jobGraphWithNewOperator,
+
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ adaptiveScheduler.startScheduling();
+
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN,
1)));
+
+ final ArchivedExecutionGraph archivedExecutionGraph =
adaptiveScheduler.requestJob();
+
+ assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED));
+ assertThat(
+ archivedExecutionGraph.getFailureInfo().getException(),
+ FlinkMatchers.containsMessage("Failed to rollback to
checkpoint/savepoint"));
+ }
+
+ @Nonnull
+ private DefaultDeclarativeSlotPool createDeclarativeSlotPool(JobID jobId) {
Review comment:
True. Will change it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]