[
https://issues.apache.org/jira/browse/FLINK-4150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15387744#comment-15387744
]
ASF GitHub Bot commented on FLINK-4150:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2256#discussion_r71711908
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
---
@@ -243,6 +256,268 @@ public void testJobRecoveryWhenLosingLeadership()
throws Exception {
}
}
+ /**
+ * Tests that the persisted job is not removed from the job graph store
+ * after the postStop method of the JobManager. Furthermore, it checks
+ * that BLOBs of the JobGraph are recovered properly and cleaned up
after
+ * the job finishes.
+ */
+ @Test
+ public void testBlobRecoveryAfterLostJobManager() throws Exception {
+ FiniteDuration timeout = new FiniteDuration(30,
TimeUnit.SECONDS);
+ FiniteDuration jobRecoveryTimeout = new FiniteDuration(3,
TimeUnit.SECONDS);
+ Deadline deadline = new FiniteDuration(2,
TimeUnit.MINUTES).fromNow();
+ Configuration flinkConfiguration = new Configuration();
+ UUID leaderSessionID = UUID.randomUUID();
+ UUID newLeaderSessionID = UUID.randomUUID();
+ int slots = 2;
+ ActorRef archiveRef = null;
+ ActorRef jobManagerRef = null;
+ ActorRef taskManagerRef = null;
+
+ String haStoragePath = temporaryFolder.newFolder().toString();
+
+ flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE,
"zookeeper");
+
flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH,
haStoragePath);
+
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
slots);
+
+ try {
+ MySubmittedJobGraphStore mySubmittedJobGraphStore = new
MySubmittedJobGraphStore();
+ TestingLeaderElectionService myLeaderElectionService =
new TestingLeaderElectionService();
+ TestingLeaderRetrievalService myLeaderRetrievalService
= new TestingLeaderRetrievalService();
+
+ archiveRef = system.actorOf(Props.create(
+ MemoryArchivist.class,
+ 10), "archive");
+
+ jobManagerRef = createJobManagerActor(
+ "jobmanager-0",
+ flinkConfiguration,
+ myLeaderElectionService,
+ mySubmittedJobGraphStore,
+ 3600000,
+ timeout,
+ jobRecoveryTimeout, archiveRef);
+
+ ActorGateway jobManager = new
AkkaActorGateway(jobManagerRef, leaderSessionID);
+
+ taskManagerRef =
TaskManager.startTaskManagerComponentsAndActor(
+ flinkConfiguration,
+ ResourceID.generate(),
+ system,
+ "localhost",
+ Option.apply("taskmanager"),
+ Option.apply((LeaderRetrievalService)
myLeaderRetrievalService),
+ true,
+ TestingTaskManager.class);
+
+ ActorGateway tmGateway = new
AkkaActorGateway(taskManagerRef, leaderSessionID);
+
+ Future<Object> tmAlive =
tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
+
+ Await.ready(tmAlive, deadline.timeLeft());
+
+ JobVertex sourceJobVertex = new JobVertex("Source");
+
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
+ sourceJobVertex.setParallelism(slots);
+
+ JobGraph jobGraph = new JobGraph("TestingJob",
sourceJobVertex);
+
+ // Upload fake JAR file to first JobManager
+ File jarFile = temporaryFolder.newFile();
+ ZipOutputStream out = new ZipOutputStream(new
FileOutputStream(jarFile));
+ out.close();
+
+ jobGraph.addJar(new Path(jarFile.toURI()));
+ JobClient.uploadJarFiles(jobGraph, jobManager,
deadline.timeLeft());
+
+ Future<Object> isLeader = jobManager.ask(
+
TestingJobManagerMessages.getNotifyWhenLeader(),
+ deadline.timeLeft());
+
+ Future<Object> isConnectedToJobManager = tmGateway.ask(
+ new
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
+ deadline.timeLeft());
+
+ // tell jobManager that he's the leader
+ myLeaderElectionService.isLeader(leaderSessionID);
+ // tell taskManager who's the leader
+
myLeaderRetrievalService.notifyListener(jobManager.path(), leaderSessionID);
+
+ Await.ready(isLeader, deadline.timeLeft());
+ Await.ready(isConnectedToJobManager,
deadline.timeLeft());
+
+ // submit blocking job
+ Future<Object> jobSubmitted = jobManager.ask(
+ new
JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
+ deadline.timeLeft());
+
+ Await.ready(jobSubmitted, deadline.timeLeft());
+
+ // Wait for running
+ Future<Object> jobRunning = jobManager.ask(
+ new
TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(),
JobStatus.RUNNING),
+ deadline.timeLeft());
+
+ Await.ready(jobRunning, deadline.timeLeft());
+
+ // terminate the job manager
+ jobManagerRef.tell(PoisonPill.getInstance(),
ActorRef.noSender());
+
+ Future<Boolean> terminatedFuture =
Patterns.gracefulStop(jobManagerRef, deadline.timeLeft());
+ Boolean terminated = Await.result(terminatedFuture,
deadline.timeLeft());
+ assertTrue("Failed to stop job manager", terminated);
+
+ // job stays in the submitted job graph store
+
assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
+
+ // start new job manager
+ myLeaderElectionService.reset();
+
+ jobManagerRef = createJobManagerActor(
+ "jobmanager-1",
+ flinkConfiguration,
+ myLeaderElectionService,
+ mySubmittedJobGraphStore,
+ 500,
+ timeout,
+ jobRecoveryTimeout,
+ archiveRef);
+
+ jobManager = new AkkaActorGateway(jobManagerRef,
newLeaderSessionID);
+
+ Future<Object> isAlive =
jobManager.ask(TestingMessages.getAlive(), deadline.timeLeft());
+
+ isLeader = jobManager.ask(
+
TestingJobManagerMessages.getNotifyWhenLeader(),
+ deadline.timeLeft());
+
+ isConnectedToJobManager = tmGateway.ask(
+ new
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
+ deadline.timeLeft());
+
+ Await.ready(isAlive, deadline.timeLeft());
+
+ // tell new jobManager that he's the leader
+ myLeaderElectionService.isLeader(newLeaderSessionID);
+ // tell taskManager who's the leader
+
myLeaderRetrievalService.notifyListener(jobManager.path(), newLeaderSessionID);
+
+ Await.ready(isLeader, deadline.timeLeft());
+ Await.ready(isConnectedToJobManager,
deadline.timeLeft());
+
+ jobRunning = jobManager.ask(
+ new
TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(),
JobStatus.RUNNING),
+ deadline.timeLeft());
+
+ // wait that the job is recovered and reaches state
RUNNING
+ Await.ready(jobRunning, deadline.timeLeft());
+
+ Future<Object> jobFinished = jobManager.ask(
+ new
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
+ deadline.timeLeft());
+
+ BlockingInvokable.unblock();
+
+ // wait til the job has finished
+ Await.ready(jobFinished, deadline.timeLeft());
+
+ // check that the job has been removed from the
submitted job graph store
+
assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
+
+ // Check that the BLOB store files are removed
+ File rootPath = new File(haStoragePath);
+
+ boolean cleanedUpFiles = false;
+ while (deadline.hasTimeLeft()) {
+ if (listFiles(rootPath).isEmpty()) {
--- End diff --
Yes, that is true. We will for example have empty folders
`<root>/blob/cache` in this test. I've added a method to try to delete the
parent directory when deleting a BLOB (same as what are currently doing in
`AbstractFileStateHandle`). I will adjust this check to check that the
directory is empty.
> Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown
> ----------------------------------------------------------------------------
>
> Key: FLINK-4150
> URL: https://issues.apache.org/jira/browse/FLINK-4150
> Project: Flink
> Issue Type: Bug
> Components: Job-Submission
> Reporter: Stefan Richter
> Assignee: Ufuk Celebi
> Priority: Blocker
> Fix For: 1.1.0
>
>
> Submitting a job in Yarn with HA can lead to the following exception:
> {code}
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
> user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
> ClassLoader info: URL ClassLoader:
> file:
> '/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0'
> (invalid JAR: zip file is empty)
> Class not resolvable through given classloader.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> Some job information, including the Blob ids, are stored in Zookeeper. The
> actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set
> to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the
> cluster is shut down, the path for the BlobStore is deleted. When the cluster
> is then restarted, recovering jobs cannot restore because it's Blob ids
> stored in Zookeeper now point to deleted files.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)