Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2256#discussion_r71510844
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
---
@@ -243,6 +256,268 @@ public void testJobRecoveryWhenLosingLeadership()
throws Exception {
}
}
+ /**
+ * Tests that the persisted job is not removed from the job graph store
+ * after the postStop method of the JobManager. Furthermore, it checks
+ * that BLOBs of the JobGraph are recovered properly and cleaned up
after
+ * the job finishes.
+ */
+ @Test
+ public void testBlobRecoveryAfterLostJobManager() throws Exception {
+ FiniteDuration timeout = new FiniteDuration(30,
TimeUnit.SECONDS);
+ FiniteDuration jobRecoveryTimeout = new FiniteDuration(3,
TimeUnit.SECONDS);
+ Deadline deadline = new FiniteDuration(2,
TimeUnit.MINUTES).fromNow();
+ Configuration flinkConfiguration = new Configuration();
+ UUID leaderSessionID = UUID.randomUUID();
+ UUID newLeaderSessionID = UUID.randomUUID();
+ int slots = 2;
+ ActorRef archiveRef = null;
+ ActorRef jobManagerRef = null;
+ ActorRef taskManagerRef = null;
+
+ String haStoragePath = temporaryFolder.newFolder().toString();
+
+ flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE,
"zookeeper");
+
flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH,
haStoragePath);
+
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
slots);
+
+ try {
+ MySubmittedJobGraphStore mySubmittedJobGraphStore = new
MySubmittedJobGraphStore();
+ TestingLeaderElectionService myLeaderElectionService =
new TestingLeaderElectionService();
+ TestingLeaderRetrievalService myLeaderRetrievalService
= new TestingLeaderRetrievalService();
+
+ archiveRef = system.actorOf(Props.create(
+ MemoryArchivist.class,
+ 10), "archive");
+
+ jobManagerRef = createJobManagerActor(
+ "jobmanager-0",
+ flinkConfiguration,
+ myLeaderElectionService,
+ mySubmittedJobGraphStore,
+ 3600000,
+ timeout,
+ jobRecoveryTimeout, archiveRef);
+
+ ActorGateway jobManager = new
AkkaActorGateway(jobManagerRef, leaderSessionID);
+
+ taskManagerRef =
TaskManager.startTaskManagerComponentsAndActor(
+ flinkConfiguration,
+ ResourceID.generate(),
+ system,
+ "localhost",
+ Option.apply("taskmanager"),
+ Option.apply((LeaderRetrievalService)
myLeaderRetrievalService),
+ true,
+ TestingTaskManager.class);
+
+ ActorGateway tmGateway = new
AkkaActorGateway(taskManagerRef, leaderSessionID);
+
+ Future<Object> tmAlive =
tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
+
+ Await.ready(tmAlive, deadline.timeLeft());
+
+ JobVertex sourceJobVertex = new JobVertex("Source");
+
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
+ sourceJobVertex.setParallelism(slots);
+
+ JobGraph jobGraph = new JobGraph("TestingJob",
sourceJobVertex);
+
+ // Upload fake JAR file to first JobManager
+ File jarFile = temporaryFolder.newFile();
+ ZipOutputStream out = new ZipOutputStream(new
FileOutputStream(jarFile));
+ out.close();
+
+ jobGraph.addJar(new Path(jarFile.toURI()));
+ JobClient.uploadJarFiles(jobGraph, jobManager,
deadline.timeLeft());
+
+ Future<Object> isLeader = jobManager.ask(
+
TestingJobManagerMessages.getNotifyWhenLeader(),
+ deadline.timeLeft());
+
+ Future<Object> isConnectedToJobManager = tmGateway.ask(
+ new
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
+ deadline.timeLeft());
+
+ // tell jobManager that he's the leader
+ myLeaderElectionService.isLeader(leaderSessionID);
+ // tell taskManager who's the leader
+
myLeaderRetrievalService.notifyListener(jobManager.path(), leaderSessionID);
+
+ Await.ready(isLeader, deadline.timeLeft());
+ Await.ready(isConnectedToJobManager,
deadline.timeLeft());
+
+ // submit blocking job
+ Future<Object> jobSubmitted = jobManager.ask(
+ new
JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
+ deadline.timeLeft());
+
+ Await.ready(jobSubmitted, deadline.timeLeft());
+
+ // Wait for running
+ Future<Object> jobRunning = jobManager.ask(
+ new
TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(),
JobStatus.RUNNING),
+ deadline.timeLeft());
+
+ Await.ready(jobRunning, deadline.timeLeft());
+
+ // terminate the job manager
+ jobManagerRef.tell(PoisonPill.getInstance(),
ActorRef.noSender());
+
+ Future<Boolean> terminatedFuture =
Patterns.gracefulStop(jobManagerRef, deadline.timeLeft());
+ Boolean terminated = Await.result(terminatedFuture,
deadline.timeLeft());
+ assertTrue("Failed to stop job manager", terminated);
+
+ // job stays in the submitted job graph store
+
assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
+
+ // start new job manager
+ myLeaderElectionService.reset();
+
+ jobManagerRef = createJobManagerActor(
+ "jobmanager-1",
+ flinkConfiguration,
+ myLeaderElectionService,
+ mySubmittedJobGraphStore,
+ 500,
+ timeout,
+ jobRecoveryTimeout,
+ archiveRef);
+
+ jobManager = new AkkaActorGateway(jobManagerRef,
newLeaderSessionID);
+
+ Future<Object> isAlive =
jobManager.ask(TestingMessages.getAlive(), deadline.timeLeft());
+
+ isLeader = jobManager.ask(
+
TestingJobManagerMessages.getNotifyWhenLeader(),
+ deadline.timeLeft());
+
+ isConnectedToJobManager = tmGateway.ask(
+ new
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
+ deadline.timeLeft());
+
+ Await.ready(isAlive, deadline.timeLeft());
+
+ // tell new jobManager that he's the leader
+ myLeaderElectionService.isLeader(newLeaderSessionID);
+ // tell taskManager who's the leader
+
myLeaderRetrievalService.notifyListener(jobManager.path(), newLeaderSessionID);
+
+ Await.ready(isLeader, deadline.timeLeft());
+ Await.ready(isConnectedToJobManager,
deadline.timeLeft());
+
+ jobRunning = jobManager.ask(
+ new
TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(),
JobStatus.RUNNING),
+ deadline.timeLeft());
+
+ // wait that the job is recovered and reaches state
RUNNING
+ Await.ready(jobRunning, deadline.timeLeft());
+
+ Future<Object> jobFinished = jobManager.ask(
+ new
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
+ deadline.timeLeft());
+
+ BlockingInvokable.unblock();
+
+ // wait til the job has finished
+ Await.ready(jobFinished, deadline.timeLeft());
+
+ // check that the job has been removed from the
submitted job graph store
+
assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
+
+ // Check that the BLOB store files are removed
+ File rootPath = new File(haStoragePath);
+
+ boolean cleanedUpFiles = false;
+ while (deadline.hasTimeLeft()) {
+ if (listFiles(rootPath).isEmpty()) {
--- End diff --
We check that the directory no longer contains files. But we don't check
for folders, right? I think that we no longer delete the folders created by the
BlobStore. We could maybe check in `BlobStore.cleanUp` whether there are any
empty folders which we can delete. Do you think that this could be relevant?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---