[FLINK-9381] Release blobs after job termination

Properly remove job blobs from BlobServer after the job terminates. If the job 
reaches a globally terminal
state, then the HA blob store files will also be cleared. In case of a 
suspension or that the job is not
finished (e.g. another process finsihes the job concurrently), we only remove 
the local blob server files.

Additionally, we properly release the user code class loader registered in the 
JobManagerRunner when it
closes.

This closes #6030.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2735c852
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2735c852
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2735c852

Branch: refs/heads/master
Commit: 2735c852b4648ae6f3e8f1e6169ef9ed6ec481d2
Parents: 8b95ba3
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu May 17 08:58:07 2018 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu May 17 11:31:46 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/blob/BlobKey.java  |   2 +-
 .../apache/flink/runtime/blob/BlobServer.java   |   8 +-
 .../flink/runtime/dispatcher/Dispatcher.java    |  51 +--
 .../librarycache/BlobLibraryCacheManager.java   |   8 +
 .../librarycache/LibraryCacheManager.java       |  11 +
 .../runtime/jobmaster/JobManagerRunner.java     |   5 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   4 +-
 .../runtime/blob/BlobCacheCleanupTest.java      |   2 +-
 .../runtime/blob/BlobServerDeleteTest.java      |   6 +-
 .../runtime/blob/BlobServerRecoveryTest.java    |   4 +-
 .../flink/runtime/blob/TestingBlobStore.java    |  73 +++++
 .../runtime/blob/TestingBlobStoreBuilder.java   |  65 ++++
 .../DispatcherResourceCleanupTest.java          | 327 +++++++++++++++++++
 .../BlobLibraryCacheRecoveryITCase.java         |   2 +-
 .../runtime/jobmaster/JobManagerRunnerTest.java |  22 ++
 15 files changed, 557 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
index 988af8b..649992d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
@@ -37,7 +37,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A BLOB key uniquely identifies a BLOB.
  */
-abstract class BlobKey implements Serializable, Comparable<BlobKey> {
+public abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 
        private static final long serialVersionUID = 3847117712521785209L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 92d1135..dd0155c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -801,11 +801,13 @@ public class BlobServer extends Thread implements 
BlobService, BlobWriter, Perma
         *
         * @param jobId
         *              ID of the job this blob belongs to
+        * @param cleanupBlobStoreFiles
+        *              True if the corresponding blob store files shall be 
cleaned up as well. Otherwise false.
         *
         * @return  <tt>true</tt> if the job directory is successfully deleted 
or non-existing;
         *          <tt>false</tt> otherwise
         */
-       public boolean cleanupJob(JobID jobId) {
+       public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) {
                checkNotNull(jobId);
 
                final File jobDir =
@@ -830,8 +832,8 @@ public class BlobServer extends Thread implements 
BlobService, BlobWriter, Perma
                                        jobDir.getAbsolutePath(), e);
                        }
 
-                       // delete in HA store
-                       boolean deletedHA = blobStore.deleteAll(jobId);
+                       // delete in HA blob store files
+                       final boolean deletedHA = !cleanupBlobStoreFiles || 
blobStore.deleteAll(jobId);
 
                        return deletedLocally && deletedHA;
                } finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 4d30870..5022388 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -536,25 +536,36 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
         * @param jobId JobID identifying the job to clean up
         * @param cleanupHA True iff HA data shall also be cleaned up
         */
-       private void removeJob(JobID jobId, boolean cleanupHA) {
+       private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean 
cleanupHA) {
+               final CompletableFuture<Void> cleanupFuture = removeJob(jobId, 
cleanupHA);
+
+               registerOrphanedJobManagerTerminationFuture(cleanupFuture);
+       }
+
+       private CompletableFuture<Void> removeJob(JobID jobId, boolean 
cleanupHA) {
                JobManagerRunner jobManagerRunner = 
jobManagerRunners.remove(jobId);
 
+               final CompletableFuture<Void> jobManagerRunnerTerminationFuture;
                if (jobManagerRunner != null) {
-                       final CompletableFuture<Void> 
jobManagerRunnerTerminationFuture = jobManagerRunner.closeAsync();
-                       
registerOrphanedJobManagerTerminationFuture(jobManagerRunnerTerminationFuture);
+                       jobManagerRunnerTerminationFuture = 
jobManagerRunner.closeAsync();
+               } else {
+                       jobManagerRunnerTerminationFuture = 
CompletableFuture.completedFuture(null);
                }
 
-               jobManagerMetricGroup.removeJob(jobId);
-
-               if (cleanupHA) {
-                       try {
-                               submittedJobGraphStore.removeJobGraph(jobId);
-                       } catch (Exception e) {
-                               log.warn("Could not properly remove job {} from 
submitted job graph store.", jobId);
-                       }
-               }
+               return jobManagerRunnerTerminationFuture.thenRunAsync(
+                       () -> {
+                               jobManagerMetricGroup.removeJob(jobId);
+                               blobServer.cleanupJob(jobId, cleanupHA);
 
-               // TODO: remove job related files from blob server
+                               if (cleanupHA) {
+                                       try {
+                                               
submittedJobGraphStore.removeJobGraph(jobId);
+                                       } catch (Exception e) {
+                                               log.warn("Could not properly 
remove job {} from submitted job graph store.", jobId);
+                                       }
+                               }
+                       },
+                       getRpcService().getExecutor());
        }
 
        /**
@@ -564,8 +575,11 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
         */
        private CompletableFuture<Void> terminateJobManagerRunners() {
                log.info("Stopping all currently running jobs of dispatcher 
{}.", getAddress());
-               final List<CompletableFuture<Void>> terminationFutures = 
jobManagerRunners.values().stream()
-                       .map(JobManagerRunner::closeAsync)
+
+               final HashSet<JobID> jobsToRemove = new 
HashSet<>(jobManagerRunners.keySet());
+
+               final List<CompletableFuture<Void>> terminationFutures = 
jobsToRemove.stream()
+                       .map(jobId -> removeJob(jobId, false))
                        .collect(Collectors.toList());
 
                return FutureUtils.completeAll(terminationFutures);
@@ -620,7 +634,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
                final JobID jobId = archivedExecutionGraph.getJobID();
 
-               removeJob(jobId, true);
+               removeJobAndRegisterTerminationFuture(jobId, true);
        }
 
        private void archiveExecutionGraph(ArchivedExecutionGraph 
archivedExecutionGraph) {
@@ -651,7 +665,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        protected void jobNotFinished(JobID jobId) {
                log.info("Job {} was not finished by JobManager.", jobId);
 
-               removeJob(jobId, false);
+               removeJobAndRegisterTerminationFuture(jobId, false);
        }
 
        private void jobMasterFailed(JobID jobId, Throwable cause) {
@@ -766,7 +780,6 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        private void clearDispatcherState() {
                final CompletableFuture<Void> 
jobManagerRunnersTerminationFuture = terminateJobManagerRunners();
                
registerOrphanedJobManagerTerminationFuture(jobManagerRunnersTerminationFuture);
-               jobManagerRunners.clear();
        }
 
        /**
@@ -816,7 +829,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        public void onRemovedJobGraph(final JobID jobId) {
                runAsync(() -> {
                        try {
-                               removeJob(jobId, false);
+                               removeJobAndRegisterTerminationFuture(jobId, 
false);
                        } catch (final Exception e) {
                                log.error("Could not remove job {}.", jobId, e);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index b90e995..9bb68cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -210,6 +211,13 @@ public class BlobLibraryCacheManager implements 
LibraryCacheManager {
                }
        }
 
+       @Override
+       public boolean hasClassLoader(@Nonnull JobID jobId) {
+               synchronized (lockObject) {
+                       return cacheEntries.containsKey(jobId);
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index 4509b6f..b66ab73 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.net.URL;
 import java.util.Collection;
@@ -96,4 +98,13 @@ public interface LibraryCacheManager {
         * Shutdown method which may release created class loaders.
         */
        void shutdown();
+
+       /**
+        * True if the LibraryCacheManager has a user code class loader 
registered
+        * for the given job id.
+        *
+        * @param jobId identifying the job for which to check the class loader
+        * @return true if the user code class loader for the given job has 
been registered. Otherwise false.
+        */
+       boolean hasClassLoader(@Nonnull JobID jobId);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 9b80820..f04e4af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -219,6 +219,9 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, A
                                                        throwable = 
ExceptionUtils.firstOrSuppressed(t, 
ExceptionUtils.stripCompletionException(throwable));
                                                }
 
+                                               final LibraryCacheManager 
libraryCacheManager = jobManagerSharedServices.getLibraryCacheManager();
+                                               
libraryCacheManager.unregisterJob(jobGraph.getJobID());
+
                                                if (throwable != null) {
                                                        
terminationFuture.completeExceptionally(
                                                                new 
FlinkException("Could not properly shut down the JobManagerRunner", throwable));
@@ -246,8 +249,8 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, A
         */
        @Override
        public void jobReachedGloballyTerminalState(ArchivedExecutionGraph 
executionGraph) {
-               // complete the result future with the terminal execution graph
                unregisterJobFromHighAvailability();
+               // complete the result future with the terminal execution graph
                resultFuture.complete(executionGraph);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 64e3337..4e4eb4c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1317,7 +1317,7 @@ class JobManager(
           log.error(s"Failed to submit job $jobId ($jobName)", t)
 
           libraryCacheManager.unregisterJob(jobId)
-          blobServer.cleanupJob(jobId)
+          blobServer.cleanupJob(jobId, true)
           currentJobs.remove(jobId)
 
           if (executionGraph != null) {
@@ -1759,7 +1759,7 @@ class JobManager(
 
     // remove all job-related BLOBs from local and HA store
     libraryCacheManager.unregisterJob(jobID)
-    blobServer.cleanupJob(jobID)
+    blobServer.cleanupJob(jobID, removeJobFromStateBackend)
 
     jobManagerMetricGroup.removeJob(jobID)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
index 93371f7..3f8dee6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
@@ -368,7 +368,7 @@ public class BlobCacheCleanupTest extends TestLogger {
 
                        // files are cached now for the given TTL - remove from 
server so that they are not re-downloaded
                        if (jobId != null) {
-                               server.cleanupJob(jobId);
+                               server.cleanupJob(jobId, true);
                        } else {
                                server.deleteFromCache(key1);
                                server.deleteFromCache(key2);

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 2168034..446ddfe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -300,7 +300,7 @@ public class BlobServerDeleteTest extends TestLogger {
                        verifyContents(server, jobId2, key2, data);
                        checkFileCountForJob(1, jobId2, server);
 
-                       server.cleanupJob(jobId1);
+                       server.cleanupJob(jobId1, true);
 
                        verifyDeleted(server, jobId1, key1a);
                        verifyDeleted(server, jobId1, key1b);
@@ -308,14 +308,14 @@ public class BlobServerDeleteTest extends TestLogger {
                        verifyContents(server, jobId2, key2, data);
                        checkFileCountForJob(1, jobId2, server);
 
-                       server.cleanupJob(jobId2);
+                       server.cleanupJob(jobId2, true);
 
                        checkFileCountForJob(0, jobId1, server);
                        verifyDeleted(server, jobId2, key2);
                        checkFileCountForJob(0, jobId2, server);
 
                        // calling a second time should not fail
-                       server.cleanupJob(jobId2);
+                       server.cleanupJob(jobId2, true);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
index 104134b..9248578 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
@@ -139,8 +139,8 @@ public class BlobServerRecoveryTest extends TestLogger {
                        verifyDeleted(cache1, jobId[0], nonHAKey);
 
                        // Remove again
-                       server1.cleanupJob(jobId[0]);
-                       server1.cleanupJob(jobId[1]);
+                       server1.cleanupJob(jobId[0], true);
+                       server1.cleanupJob(jobId[1], true);
 
                        // Verify everything is clean
                        assertTrue("HA storage directory does not exist", 
fs.exists(new Path(storagePath)));

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingBlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingBlobStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingBlobStore.java
new file mode 100644
index 0000000..6b81530
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingBlobStore.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * {@link BlobStore} implementation for testing purposes.
+ */
+public class TestingBlobStore implements BlobStore {
+
+       @Nonnull
+       private final Function<Tuple3<File, JobID, BlobKey>, Boolean> 
putFunction;
+
+       @Nonnull
+       private final BiFunction<JobID, BlobKey, Boolean> deleteFunction;
+
+       @Nonnull
+       private final Function<JobID, Boolean> deleteAllFunction;
+
+       @Nonnull
+       private final Function<Tuple3<JobID, BlobKey, File>, Boolean> 
getFunction;
+
+       public TestingBlobStore(@Nonnull Function<Tuple3<File, JobID, BlobKey>, 
Boolean> putFunction, @Nonnull BiFunction<JobID, BlobKey, Boolean> 
deleteFunction, @Nonnull Function<JobID, Boolean> deleteAllFunction, @Nonnull 
Function<Tuple3<JobID, BlobKey, File>, Boolean> getFunction) {
+               this.putFunction = putFunction;
+               this.deleteFunction = deleteFunction;
+               this.deleteAllFunction = deleteAllFunction;
+               this.getFunction = getFunction;
+       }
+
+       @Override
+       public boolean put(File localFile, JobID jobId, BlobKey blobKey) {
+               return putFunction.apply(Tuple3.of(localFile, jobId, blobKey));
+       }
+
+       @Override
+       public boolean delete(JobID jobId, BlobKey blobKey) {
+               return deleteFunction.apply(jobId, blobKey);
+       }
+
+       @Override
+       public boolean deleteAll(JobID jobId) {
+               return deleteAllFunction.apply(jobId);
+       }
+
+       @Override
+       public boolean get(JobID jobId, BlobKey blobKey, File localFile) {
+               return getFunction.apply(Tuple3.of(jobId, blobKey, localFile));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingBlobStoreBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingBlobStoreBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingBlobStoreBuilder.java
new file mode 100644
index 0000000..d03a01c
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingBlobStoreBuilder.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.io.File;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Builder for the {@link TestingBlobStoreBuilder}.
+ */
+public class TestingBlobStoreBuilder {
+       private static final Function<Tuple3<File, JobID, BlobKey>, Boolean> 
DEFAULT_PUT_FUNCTION = ignored -> true;
+       private static final BiFunction<JobID, BlobKey, Boolean> 
DEFAULT_DELETE_FUNCTION = (ignoredA, ignoredB) -> true;
+       private static final Function<JobID, Boolean> 
DEFAULT_DELETE_ALL_FUNCTION = ignored -> true;
+       private static final Function<Tuple3<JobID, BlobKey, File>, Boolean> 
DEFAULT_GET_FUNCTION = ignored -> true;
+
+       private Function<Tuple3<File, JobID, BlobKey>, Boolean> putFunction = 
DEFAULT_PUT_FUNCTION;
+       private BiFunction<JobID, BlobKey, Boolean> deleteFunction = 
DEFAULT_DELETE_FUNCTION;
+       private Function<JobID, Boolean> deleteAllFunction = 
DEFAULT_DELETE_ALL_FUNCTION;
+       private Function<Tuple3<JobID, BlobKey, File>, Boolean> getFunction = 
DEFAULT_GET_FUNCTION;
+
+       public TestingBlobStoreBuilder setPutFunction(Function<Tuple3<File, 
JobID, BlobKey>, Boolean> putFunction) {
+               this.putFunction = putFunction;
+               return this;
+       }
+
+       public TestingBlobStoreBuilder setDeleteFunction(BiFunction<JobID, 
BlobKey, Boolean> deleteFunction) {
+               this.deleteFunction = deleteFunction;
+               return this;
+       }
+
+       public TestingBlobStoreBuilder setDeleteAllFunction(Function<JobID, 
Boolean> deleteAllFunction) {
+               this.deleteAllFunction = deleteAllFunction;
+               return this;
+       }
+
+       public TestingBlobStoreBuilder setGetFunction(Function<Tuple3<JobID, 
BlobKey, File>, Boolean> getFunction) {
+               this.getFunction = getFunction;
+               return this;
+       }
+
+       public TestingBlobStore createTestingBlobStore() {
+               return new TestingBlobStore(putFunction, deleteFunction, 
deleteAllFunction, getFunction);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
new file mode 100644
index 0000000..77b3fc4
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.TestingBlobStore;
+import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
+import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
+import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests the resource cleanup by the {@link Dispatcher}.
+ */
+public class DispatcherResourceCleanupTest extends TestLogger {
+
+       @ClassRule
+       public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       private static final Time timeout = Time.seconds(10L);
+
+       private static TestingRpcService rpcService;
+
+       private JobID jobId;
+
+       private JobGraph jobGraph;
+
+       private Configuration configuration;
+
+       private TestingLeaderElectionService dispatcherLeaderElectionService;
+
+       private TestingDispatcher dispatcher;
+
+       private DispatcherGateway dispatcherGateway;
+
+       private TestingFatalErrorHandler fatalErrorHandler;
+
+       private BlobServer blobServer;
+
+       private PermanentBlobKey permanentBlobKey;
+
+       private File blobFile;
+
+       private CompletableFuture<BlobKey> storedBlobFuture;
+       private CompletableFuture<JobID> deleteAllFuture;
+       private CompletableFuture<ArchivedExecutionGraph> resultFuture;
+       private CompletableFuture<JobID> cleanupJobFuture;
+
+       @BeforeClass
+       public static void setupClass() {
+               rpcService = new TestingRpcService();
+       }
+
+       @Before
+       public void setup() throws Exception {
+               final JobVertex testVertex = new JobVertex("testVertex");
+               testVertex.setInvokableClass(NoOpInvokable.class);
+               jobId = new JobID();
+               jobGraph = new JobGraph(jobId, "testJob", testVertex);
+               jobGraph.setAllowQueuedScheduling(true);
+
+               configuration = new Configuration();
+               configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
+               final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
+               dispatcherLeaderElectionService = new 
TestingLeaderElectionService();
+               
highAvailabilityServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
+
+               storedBlobFuture = new CompletableFuture<>();
+               deleteAllFuture = new CompletableFuture<>();
+
+               final TestingBlobStore testingBlobStore = new 
TestingBlobStoreBuilder()
+                       .setPutFunction(
+                               putArguments -> 
storedBlobFuture.complete(putArguments.f2))
+                       .setDeleteAllFunction(deleteAllFuture::complete)
+                       .createTestingBlobStore();
+
+               cleanupJobFuture = new CompletableFuture<>();
+
+               blobServer = new TestingBlobServer(configuration, 
testingBlobStore, cleanupJobFuture);
+
+               // upload a blob to the blob server
+               permanentBlobKey = blobServer.putPermanent(jobId, new 
byte[256]);
+               blobFile = blobServer.getStorageLocation(jobId, 
permanentBlobKey);
+
+               resultFuture = new CompletableFuture<>();
+
+               fatalErrorHandler = new TestingFatalErrorHandler();
+
+               dispatcher = new TestingDispatcher(
+                       rpcService,
+                       Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
+                       configuration,
+                       highAvailabilityServices,
+                       new InMemorySubmittedJobGraphStore(),
+                       new TestingResourceManagerGateway(),
+                       blobServer,
+                       new HeartbeatServices(1000L, 1000L),
+                       
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+                       null,
+                       new MemoryArchivedExecutionGraphStore(),
+                       new TestingJobManagerRunnerFactory(resultFuture, 
CompletableFuture.completedFuture(null)),
+                       fatalErrorHandler,
+                       null,
+                       VoidHistoryServerArchivist.INSTANCE);
+
+               dispatcher.start();
+
+               dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+               assertThat(blobFile.exists(), is(true));
+
+               // verify that we stored the blob also in the BlobStore
+               assertThat(storedBlobFuture.get(), equalTo(permanentBlobKey));
+       }
+
+       @After
+       public void teardown() throws Exception {
+               if (dispatcher != null) {
+                       dispatcher.shutDown();
+                       dispatcher.getTerminationFuture().get();
+               }
+
+               if (fatalErrorHandler != null) {
+                       fatalErrorHandler.rethrowError();
+               }
+       }
+
+       @AfterClass
+       public static void teardownClass() throws ExecutionException, 
InterruptedException {
+               if (rpcService != null) {
+                       rpcService.stopService().get();
+               }
+       }
+
+       @Test
+       public void testBlobServerCleanupWhenJobFinished() throws Exception {
+               submitJob();
+
+               // complete the job
+               resultFuture.complete(new 
ArchivedExecutionGraphBuilder().setJobID(jobId).setState(JobStatus.FINISHED).build());
+
+               assertThat(cleanupJobFuture.get(), equalTo(jobId));
+
+               // verify that we also cleared the BlobStore
+               assertThat(deleteAllFuture.get(), equalTo(jobId));
+
+               assertThat(blobFile.exists(), is(false));
+       }
+
+       private void submitJob() throws InterruptedException, 
ExecutionException {
+               final CompletableFuture<Acknowledge> submissionFuture = 
dispatcherGateway.submitJob(jobGraph, timeout);
+               submissionFuture.get();
+       }
+
+       @Test
+       public void testBlobServerCleanupWhenJobNotFinished() throws Exception {
+               submitJob();
+
+               // job not finished
+               resultFuture.completeExceptionally(new 
JobNotFinishedException(jobId));
+
+               assertThat(cleanupJobFuture.get(), equalTo(jobId));
+
+               assertThat(blobFile.exists(), is(false));
+
+               // verify that we did not clear the BlobStore
+               try {
+                       deleteAllFuture.get(50L, TimeUnit.MILLISECONDS);
+                       fail("We should not delete the HA blobs.");
+               } catch (TimeoutException ignored) {
+                       // expected
+               }
+
+               assertThat(deleteAllFuture.isDone(), is(false));
+       }
+
+       @Test
+       public void testBlobServerCleanupWhenClosingDispatcher() throws 
Exception {
+               submitJob();
+
+               dispatcher.shutDown();
+               dispatcher.getTerminationFuture().get();
+
+               assertThat(cleanupJobFuture.get(), equalTo(jobId));
+
+               assertThat(blobFile.exists(), is(false));
+
+               // verify that we did not clear the BlobStore
+               try {
+                       deleteAllFuture.get(50L, TimeUnit.MILLISECONDS);
+                       fail("We should not delete the HA blobs.");
+               } catch (TimeoutException ignored) {
+                       // expected
+               }
+
+               assertThat(deleteAllFuture.isDone(), is(false));
+       }
+
+       private static final class TestingDispatcher extends Dispatcher {
+               public TestingDispatcher(RpcService rpcService, String 
endpointId, Configuration configuration, HighAvailabilityServices 
highAvailabilityServices, SubmittedJobGraphStore submittedJobGraphStore, 
ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, 
HeartbeatServices heartbeatServices, JobManagerMetricGroup 
jobManagerMetricGroup, @Nullable String metricServiceQueryPath, 
ArchivedExecutionGraphStore archivedExecutionGraphStore, 
JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler 
fatalErrorHandler, @Nullable String restAddress, HistoryServerArchivist 
historyServerArchivist) throws Exception {
+                       super(rpcService, endpointId, configuration, 
highAvailabilityServices, submittedJobGraphStore, resourceManagerGateway, 
blobServer, heartbeatServices, jobManagerMetricGroup, metricServiceQueryPath, 
archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, 
restAddress, historyServerArchivist);
+               }
+       }
+
+       private static final class TestingJobManagerRunnerFactory implements 
Dispatcher.JobManagerRunnerFactory {
+
+               private final CompletableFuture<ArchivedExecutionGraph> 
resultFuture;
+
+               private final CompletableFuture<Void> terminationFuture;
+
+               private 
TestingJobManagerRunnerFactory(CompletableFuture<ArchivedExecutionGraph> 
resultFuture, CompletableFuture<Void> terminationFuture) {
+                       this.resultFuture = resultFuture;
+                       this.terminationFuture = terminationFuture;
+               }
+
+               @Override
+               public JobManagerRunner createJobManagerRunner(ResourceID 
resourceId, JobGraph jobGraph, Configuration configuration, RpcService 
rpcService, HighAvailabilityServices highAvailabilityServices, 
HeartbeatServices heartbeatServices, BlobServer blobServer, 
JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) {
+                       final JobManagerRunner jobManagerRunnerMock = 
mock(JobManagerRunner.class);
+
+                       
when(jobManagerRunnerMock.getResultFuture()).thenReturn(resultFuture);
+                       
when(jobManagerRunnerMock.closeAsync()).thenReturn(terminationFuture);
+
+                       return jobManagerRunnerMock;
+               }
+       }
+
+       private static final class TestingBlobServer extends BlobServer {
+
+               private final CompletableFuture<JobID> cleanupJobFuture;
+
+               /**
+                * Instantiates a new BLOB server and binds it to a free 
network port.
+                *
+                * @param config    Configuration to be used to instantiate the 
BlobServer
+                * @param blobStore BlobStore to store blobs persistently
+                * @param cleanupJobFuture
+                * @throws IOException thrown if the BLOB server cannot bind to 
a free network port or if the
+                *                     (local or distributed) file storage 
cannot be created or is not usable
+                */
+               public TestingBlobServer(Configuration config, BlobStore 
blobStore, CompletableFuture<JobID> cleanupJobFuture) throws IOException {
+                       super(config, blobStore);
+                       this.cleanupJobFuture = cleanupJobFuture;
+               }
+
+               @Override
+               public boolean cleanupJob(JobID jobId, boolean 
cleanupBlobStoreFiles) {
+                       final boolean result = super.cleanupJob(jobId, 
cleanupBlobStoreFiles);
+                       cleanupJobFuture.complete(jobId);
+                       return result;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index a5f22cb..7360cf7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -148,7 +148,7 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                        }
 
                        // Remove blobs again
-                       server[1].cleanupJob(jobId);
+                       server[1].cleanupJob(jobId, true);
 
                        // Verify everything is clean below 
recoveryDir/<cluster_id>
                        final String clusterId = 
config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);

http://git-wip-us.apache.org/repos/asf/flink/blob/2735c852/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
index a85d351..08f6fe5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -204,6 +206,26 @@ public class JobManagerRunnerTest extends TestLogger {
                }
        }
 
+       @Test
+       public void testLibraryCacheManagerRegistration() throws Exception {
+               final JobManagerRunner jobManagerRunner = 
createJobManagerRunner();
+
+               try {
+                       jobManagerRunner.start();
+
+                       final LibraryCacheManager libraryCacheManager = 
jobManagerSharedServices.getLibraryCacheManager();
+
+                       final JobID jobID = jobGraph.getJobID();
+                       assertThat(libraryCacheManager.hasClassLoader(jobID), 
is(true));
+
+                       jobManagerRunner.close();
+
+                       assertThat(libraryCacheManager.hasClassLoader(jobID), 
is(false));
+               } finally {
+                       jobManagerRunner.close();
+               }
+       }
+
        @Nonnull
        private JobManagerRunner createJobManagerRunner() throws Exception {
                return new JobManagerRunner(

Reply via email to