http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala index cd7b363..61c61b4 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala @@ -18,11 +18,12 @@ package org.apache.flink.runtime.clusterframework -import java.util.concurrent.{ScheduledExecutorService, Executor} +import java.util.concurrent.{Executor, ScheduledExecutorService} import akka.actor.ActorRef import org.apache.flink.api.common.JobID import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager @@ -51,6 +52,7 @@ import scala.language.postfixOps * @param instanceManager Instance manager to manage the registered * [[org.apache.flink.runtime.taskmanager.TaskManager]] * @param scheduler Scheduler to schedule Flink jobs + * @param blobServer Server instance to store BLOBs for the individual tasks * @param libraryCacheManager Manager to manage uploaded jar files * @param archive Archive for finished Flink jobs * @param restartStrategyFactory Restart strategy to be used in case of a job recovery @@ -63,6 +65,7 @@ abstract class ContaineredJobManager( ioExecutor: Executor, instanceManager: InstanceManager, scheduler: FlinkScheduler, + blobServer: BlobServer, libraryCacheManager: BlobLibraryCacheManager, archive: ActorRef, restartStrategyFactory: RestartStrategyFactory, @@ -78,6 +81,7 @@ abstract class ContaineredJobManager( ioExecutor, instanceManager, scheduler, + blobServer, libraryCacheManager, archive, restartStrategyFactory,
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 1616a7b..8c551a7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -126,6 +126,7 @@ class JobManager( protected val ioExecutor: Executor, protected val instanceManager: InstanceManager, protected val scheduler: FlinkScheduler, + protected val blobServer: BlobServer, protected val libraryCacheManager: BlobLibraryCacheManager, protected val archive: ActorRef, protected val restartStrategyFactory: RestartStrategyFactory, @@ -272,11 +273,12 @@ class JobManager( instanceManager.shutdown() scheduler.shutdown() + libraryCacheManager.shutdown() try { - libraryCacheManager.shutdown() + blobServer.close() } catch { - case e: IOException => log.error("Could not properly shutdown the library cache manager.", e) + case e: IOException => log.error("Could not properly shutdown the blob server.", e) } // failsafe shutdown of the metrics registry @@ -422,7 +424,7 @@ class JobManager( taskManager ! decorateMessage( AlreadyRegistered( instanceID, - libraryCacheManager.getBlobServerPort)) + blobServer.getPort)) } else { try { val actorGateway = new AkkaActorGateway(taskManager, leaderSessionID.orNull) @@ -437,7 +439,7 @@ class JobManager( taskManagerMap.put(taskManager, instanceID) taskManager ! decorateMessage( - AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)) + AcknowledgeRegistration(instanceID, blobServer.getPort)) // to be notified when the taskManager is no longer reachable context.watch(taskManager) @@ -839,6 +841,7 @@ class JobManager( try { log.info(s"Disposing savepoint at '$savepointPath'.") //TODO user code class loader ? + // (has not been used so far and new savepoints can simply be deleted by file) val savepoint = SavepointStore.loadSavepoint( savepointPath, Thread.currentThread().getContextClassLoader) @@ -1060,7 +1063,7 @@ class JobManager( case Some((graph, jobInfo)) => sender() ! decorateMessage( ClassloadingProps( - libraryCacheManager.getBlobServerPort, + blobServer.getPort, graph.getRequiredJarFiles, graph.getRequiredClasspaths)) case None => @@ -1068,7 +1071,7 @@ class JobManager( } case RequestBlobManagerPort => - sender ! decorateMessage(libraryCacheManager.getBlobServerPort) + sender ! decorateMessage(blobServer.getPort) case RequestArchive => sender ! decorateMessage(ResponseArchive(archive)) @@ -1254,8 +1257,8 @@ class JobManager( // because this makes sure that the uploaded jar files are removed in case of // unsuccessful try { - libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys, - jobGraph.getClasspaths) + libraryCacheManager.registerJob( + jobGraph.getJobID, jobGraph.getUserJarBlobKeys, jobGraph.getClasspaths) } catch { case t: Throwable => @@ -1344,6 +1347,7 @@ class JobManager( log.error(s"Failed to submit job $jobId ($jobName)", t) libraryCacheManager.unregisterJob(jobId) + blobServer.cleanupJob(jobId) currentJobs.remove(jobId) if (executionGraph != null) { @@ -1785,12 +1789,10 @@ class JobManager( case None => None } - try { - libraryCacheManager.unregisterJob(jobID) - } catch { - case t: Throwable => - log.error(s"Could not properly unregister job $jobID from the library cache.", t) - } + // remove all job-related BLOBs from local and HA store + libraryCacheManager.unregisterJob(jobID) + blobServer.cleanupJob(jobID) + jobManagerMetricGroup.foreach(_.removeJob(jobID)) futureOption @@ -2463,6 +2465,7 @@ object JobManager { blobStore: BlobStore) : (InstanceManager, FlinkScheduler, + BlobServer, BlobLibraryCacheManager, RestartStrategyFactory, FiniteDuration, // timeout @@ -2474,10 +2477,6 @@ object JobManager { val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration) - val cleanupInterval = configuration.getLong( - ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, - ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000 - val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration) val archiveCount = configuration.getInteger(WebOptions.ARCHIVE_COUNT) @@ -2508,21 +2507,21 @@ object JobManager { blobServer = new BlobServer(configuration, blobStore) instanceManager = new InstanceManager() scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor)) - libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval) + libraryCacheManager = new BlobLibraryCacheManager(blobServer) instanceManager.addInstanceListener(scheduler) } catch { case t: Throwable => - if (libraryCacheManager != null) { - libraryCacheManager.shutdown() - } if (scheduler != null) { scheduler.shutdown() } if (instanceManager != null) { instanceManager.shutdown() } + if (libraryCacheManager != null) { + libraryCacheManager.shutdown() + } if (blobServer != null) { blobServer.close() } @@ -2554,6 +2553,7 @@ object JobManager { (instanceManager, scheduler, + blobServer, libraryCacheManager, restartStrategy, timeout, @@ -2627,6 +2627,7 @@ object JobManager { val (instanceManager, scheduler, + blobServer, libraryCacheManager, restartStrategy, timeout, @@ -2654,6 +2655,7 @@ object JobManager { ioExecutor, instanceManager, scheduler, + blobServer, libraryCacheManager, archive, restartStrategy, @@ -2693,6 +2695,7 @@ object JobManager { ioExecutor: Executor, instanceManager: InstanceManager, scheduler: FlinkScheduler, + blobServer: BlobServer, libraryCacheManager: LibraryCacheManager, archive: ActorRef, restartStrategyFactory: RestartStrategyFactory, @@ -2710,6 +2713,7 @@ object JobManager { ioExecutor, instanceManager, scheduler, + blobServer, libraryCacheManager, archive, restartStrategyFactory, http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 0ae00a9..dcf9dd0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -26,6 +26,7 @@ import org.apache.flink.api.common.JobID import org.apache.flink.api.common.io.FileOutputFormat import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, ResourceManagerOptions, TaskManagerOptions} import org.apache.flink.core.fs.Path +import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager @@ -133,6 +134,7 @@ class LocalFlinkMiniCluster( val (instanceManager, scheduler, + blobServer, libraryCacheManager, restartStrategyFactory, timeout, @@ -164,6 +166,7 @@ class LocalFlinkMiniCluster( ioExecutor, instanceManager, scheduler, + blobServer, libraryCacheManager, archive, restartStrategyFactory, @@ -279,6 +282,7 @@ class LocalFlinkMiniCluster( ioExecutor: Executor, instanceManager: InstanceManager, scheduler: Scheduler, + blobServer: BlobServer, libraryCacheManager: BlobLibraryCacheManager, archive: ActorRef, restartStrategyFactory: RestartStrategyFactory, @@ -297,6 +301,7 @@ class LocalFlinkMiniCluster( ioExecutor, instanceManager, scheduler, + blobServer, libraryCacheManager, archive, restartStrategyFactory, http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 0c419eb..431adb6 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -35,7 +35,7 @@ import org.apache.flink.configuration._ import org.apache.flink.core.fs.FileSystem import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, DefaultQuarantineHandler, QuarantineMonitor} -import org.apache.flink.runtime.blob.{BlobCache, BlobClient, BlobService} +import org.apache.flink.runtime.blob.{BlobCache, BlobClient} import org.apache.flink.runtime.broadcast.BroadcastVariableManager import org.apache.flink.runtime.clusterframework.messages.StopCluster import org.apache.flink.runtime.clusterframework.types.ResourceID @@ -160,7 +160,7 @@ class TaskManager( * registered at the job manager */ private val waitForRegistration = scala.collection.mutable.Set[ActorRef]() - private var blobService: Option[BlobService] = None + private var blobCache: Option[BlobCache] = None private var libraryCacheManager: Option[LibraryCacheManager] = None /* The current leading JobManager Actor associated with */ @@ -333,11 +333,11 @@ class TaskManager( killTaskManagerFatal(message, cause) case RequestTaskManagerLog(requestType : LogTypeRequest) => - blobService match { + blobCache match { case Some(_) => handleRequestTaskManagerLog(sender(), requestType, currentJobManager.get) case None => - sender() ! akka.actor.Status.Failure(new IOException("BlobService not " + + sender() ! akka.actor.Status.Failure(new IOException("BlobCache not " + "available. Cannot upload TaskManager logs.")) } @@ -840,7 +840,7 @@ class TaskManager( if (file.exists()) { val fis = new FileInputStream(file); Future { - val client: BlobClient = blobService.get.createClient() + val client: BlobClient = blobCache.get.createClient() client.put(fis); }(context.dispatcher) .onComplete { @@ -915,7 +915,7 @@ class TaskManager( "starting network stack and library cache.") // sanity check that the JobManager dependent components are not set up currently - if (connectionUtils.isDefined || blobService.isDefined) { + if (connectionUtils.isDefined || blobCache.isDefined) { throw new IllegalStateException("JobManager-specific components are already initialized.") } @@ -968,9 +968,9 @@ class TaskManager( address, config.getConfiguration(), highAvailabilityServices.createBlobStore()) - blobService = Option(blobcache) + blobCache = Option(blobcache) libraryCacheManager = Some( - new BlobLibraryCacheManager(blobcache, config.getCleanupInterval())) + new BlobLibraryCacheManager(blobcache)) } catch { case e: Exception => @@ -1047,18 +1047,11 @@ class TaskManager( // shut down BLOB and library cache libraryCacheManager foreach { - manager => - try { - manager.shutdown() - } catch { - case ioe: IOException => log.error( - "Could not properly shutdown library cache manager.", - ioe) - } + manager => manager.shutdown() } libraryCacheManager = None - blobService foreach { + blobCache foreach { service => try { service.close() @@ -1066,7 +1059,7 @@ class TaskManager( case ioe: IOException => log.error("Could not properly shutdown blob service.", ioe) } } - blobService = None + blobCache = None // disassociate the slot environment connectionUtils = None @@ -1130,6 +1123,10 @@ class TaskManager( case Some(manager) => manager case None => throw new IllegalStateException("There is no valid library cache manager.") } + val blobCache = this.blobCache match { + case Some(manager) => manager + case None => throw new IllegalStateException("There is no valid BLOB cache.") + } val slot = tdd.getTargetSlotNumber if (slot < 0 || slot >= numberOfSlots) { @@ -1200,6 +1197,7 @@ class TaskManager( taskManagerConnection, inputSplitProvider, checkpointResponder, + blobCache, libCache, fileCache, config, http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java new file mode 100644 index 0000000..afd365b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.TestLogger; + +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * A few tests for the deferred ref-counting based cleanup inside the {@link BlobCache}. + */ +public class BlobCacheCleanupTest extends TestLogger { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** + * Tests that {@link BlobCache} cleans up after calling {@link BlobCache#releaseJob(JobID)}. + */ + @Test + public void testJobCleanup() throws IOException, InterruptedException { + + JobID jobId = new JobID(); + List<BlobKey> keys = new ArrayList<BlobKey>(); + BlobServer server = null; + BlobCache cache = null; + + final byte[] buf = new byte[128]; + + try { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L); + + server = new BlobServer(config, new VoidBlobStore()); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + + // upload blobs + try (BlobClient bc = new BlobClient(serverAddress, config)) { + keys.add(bc.put(jobId, buf)); + buf[0] += 1; + keys.add(bc.put(jobId, buf)); + } + + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); + + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); + + // register once + cache.registerJob(jobId); + + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); + + for (BlobKey key : keys) { + cache.getFile(jobId, key); + } + + // register again (let's say, from another thread or so) + cache.registerJob(jobId); + for (BlobKey key : keys) { + cache.getFile(jobId, key); + } + + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // after releasing once, nothing should change + cache.releaseJob(jobId); + + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // after releasing the second time, the job is up for deferred cleanup + cache.releaseJob(jobId); + + // because we cannot guarantee that there are not thread races in the build system, we + // loop for a certain while until the references disappear + { + long deadline = System.currentTimeMillis() + 30_000L; + do { + Thread.sleep(100); + } + while (checkFilesExist(jobId, keys, cache, false) != 0 && + System.currentTimeMillis() < deadline); + } + + // the blob cache should no longer contain the files + // this fails if we exited via a timeout + checkFileCountForJob(0, jobId, cache); + // server should be unaffected + checkFileCountForJob(2, jobId, server); + } + finally { + if (cache != null) { + cache.close(); + } + + if (server != null) { + server.close(); + } + // now everything should be cleaned up + checkFileCountForJob(0, jobId, server); + } + } + + /** + * Tests that {@link BlobCache} cleans up after calling {@link BlobCache#releaseJob(JobID)} + * but only after preserving the file for a bit longer. + */ + @Test + @Ignore("manual test due to stalling: ensures a BLOB is retained first and only deleted after the (long) timeout ") + public void testJobDeferredCleanup() throws IOException, InterruptedException { + // file should be deleted between 5 and 10s after last job release + long cleanupInterval = 5L; + + JobID jobId = new JobID(); + List<BlobKey> keys = new ArrayList<BlobKey>(); + BlobServer server = null; + BlobCache cache = null; + + final byte[] buf = new byte[128]; + + try { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setLong(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval); + + server = new BlobServer(config, new VoidBlobStore()); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + + // upload blobs + try (BlobClient bc = new BlobClient(serverAddress, config)) { + keys.add(bc.put(jobId, buf)); + buf[0] += 1; + keys.add(bc.put(jobId, buf)); + } + + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); + + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); + + // register once + cache.registerJob(jobId); + + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); + + for (BlobKey key : keys) { + cache.getFile(jobId, key); + } + + // register again (let's say, from another thread or so) + cache.registerJob(jobId); + for (BlobKey key : keys) { + cache.getFile(jobId, key); + } + + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // after releasing once, nothing should change + cache.releaseJob(jobId); + + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // after releasing the second time, the job is up for deferred cleanup + cache.releaseJob(jobId); + + // files should still be accessible for now + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, cache); + + Thread.sleep(cleanupInterval / 5); + // still accessible... + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, cache); + + Thread.sleep((cleanupInterval * 4) / 5); + + // files are up for cleanup now...wait for it: + // because we cannot guarantee that there are not thread races in the build system, we + // loop for a certain while until the references disappear + { + long deadline = System.currentTimeMillis() + 30_000L; + do { + Thread.sleep(100); + } + while (checkFilesExist(jobId, keys, cache, false) != 0 && + System.currentTimeMillis() < deadline); + } + + // the blob cache should no longer contain the files + // this fails if we exited via a timeout + checkFileCountForJob(0, jobId, cache); + // server should be unaffected + checkFileCountForJob(2, jobId, server); + } + finally { + if (cache != null) { + cache.close(); + } + + if (server != null) { + server.close(); + } + // now everything should be cleaned up + checkFileCountForJob(0, jobId, server); + } + } + + /** + * Checks how many of the files given by blob keys are accessible. + * + * @param jobId + * ID of a job + * @param keys + * blob keys to check + * @param blobService + * BLOB store to use + * @param doThrow + * whether exceptions should be ignored (<tt>false</tt>), or thrown (<tt>true</tt>) + * + * @return number of files we were able to retrieve via {@link BlobService#getFile} + */ + public static int checkFilesExist( + JobID jobId, Collection<BlobKey> keys, BlobService blobService, boolean doThrow) + throws IOException { + + int numFiles = 0; + + for (BlobKey key : keys) { + final File blobFile; + if (blobService instanceof BlobServer) { + BlobServer server = (BlobServer) blobService; + blobFile = server.getStorageLocation(jobId, key); + } else { + BlobCache cache = (BlobCache) blobService; + blobFile = cache.getStorageLocation(jobId, key); + } + if (blobFile.exists()) { + ++numFiles; + } else if (doThrow) { + throw new IOException("File " + blobFile + " does not exist."); + } + } + + return numFiles; + } + + /** + * Checks how many of the files given by blob keys are accessible. + * + * @param expectedCount + * number of expected files in the blob service for the given job + * @param jobId + * ID of a job + * @param blobService + * BLOB store to use + * + * @return number of files we were able to retrieve via {@link BlobService#getFile} + */ + public static void checkFileCountForJob( + int expectedCount, JobID jobId, BlobService blobService) + throws IOException { + + final File jobDir; + if (blobService instanceof BlobServer) { + BlobServer server = (BlobServer) blobService; + jobDir = server.getStorageLocation(jobId, new BlobKey()).getParentFile(); + } else { + BlobCache cache = (BlobCache) blobService; + jobDir = cache.getStorageLocation(jobId, new BlobKey()).getParentFile(); + } + File[] blobsForJob = jobDir.listFiles(); + if (blobsForJob == null) { + if (expectedCount != 0) { + throw new IOException("File " + jobDir + " does not exist."); + } + } else { + assertEquals("Too many/few files in job dir: " + + Arrays.asList(blobsForJob).toString(), expectedCount, + blobsForJob.length); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java index 8c575a9..0060ccb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.util.TestLogger; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -37,7 +39,7 @@ import static org.junit.Assert.*; /** * Unit tests for the blob cache retrying the connection to the server. */ -public class BlobCacheRetriesTest { +public class BlobCacheRetriesTest extends TestLogger { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index d511e86..6d6bfd5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -39,6 +39,8 @@ import java.util.Collections; import java.util.List; import org.apache.flink.api.common.JobID; +import org.apache.flink.util.TestLogger; + import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -46,7 +48,7 @@ import static org.junit.Assert.fail; /** * This class contains unit tests for the {@link BlobClient}. */ -public class BlobClientTest { +public class BlobClientTest extends TestLogger { /** The buffer size used during the tests in bytes. */ private static final int TEST_BUFFER_SIZE = 17 * 1000; @@ -214,7 +216,7 @@ public class BlobClientTest { * Tests the PUT/GET operations for content-addressable buffers. */ @Test - public void testContentAddressableBuffer() { + public void testContentAddressableBuffer() throws IOException { BlobClient client = null; @@ -256,10 +258,6 @@ public class BlobClientTest { // expected } } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { if (client != null) { try { @@ -281,7 +279,7 @@ public class BlobClientTest { * Tests the PUT/GET operations for content-addressable streams. */ @Test - public void testContentAddressableStream() { + public void testContentAddressableStream() throws IOException { BlobClient client = null; InputStream is = null; @@ -313,10 +311,6 @@ public class BlobClientTest { validateGetAndClose(client.get(receivedKey), testFile); validateGetAndClose(client.get(jobId, receivedKey), testFile); } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { if (is != null) { try { @@ -332,7 +326,7 @@ public class BlobClientTest { } /** - * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, List)} helper. + * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)} helper. */ @Test public void testUploadJarFilesHelper() throws Exception { @@ -340,7 +334,7 @@ public class BlobClientTest { } /** - * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, List)} helper. + * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)}} helper. */ static void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception { final File testFile = File.createTempFile("testfile", ".dat"); @@ -354,15 +348,16 @@ public class BlobClientTest { } private static void uploadJarFile( - final InetSocketAddress serverAddress, final Configuration blobClientConfig, - final File testFile) throws IOException { + final InetSocketAddress serverAddress, final Configuration blobClientConfig, + final File testFile) throws IOException { + JobID jobId = new JobID(); List<BlobKey> blobKeys = BlobClient.uploadJarFiles(serverAddress, blobClientConfig, - Collections.singletonList(new Path(testFile.toURI()))); + jobId, Collections.singletonList(new Path(testFile.toURI()))); assertEquals(1, blobKeys.size()); try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig)) { - validateGetAndClose(blobClient.get(blobKeys.get(0)), testFile); + validateGetAndClose(blobClient.get(jobId, blobKeys.get(0)), testFile); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java index 4071a1c..43bc622 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java @@ -29,12 +29,14 @@ import java.io.IOException; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.util.StringUtils; +import org.apache.flink.util.TestLogger; + import org.junit.Test; /** * This class contains unit tests for the {@link BlobKey} class. */ -public final class BlobKeyTest { +public final class BlobKeyTest extends TestLogger { /** * The first key array to be used during the unit tests. */ @@ -106,4 +108,4 @@ public final class BlobKeyTest { assertEquals(k1, k2); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index 413e2e9..6bb5ab5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -41,6 +41,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob; +import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist; import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -62,7 +64,7 @@ public class BlobServerDeleteTest extends TestLogger { public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Test - public void testDeleteSingleByBlobKey() { + public void testDeleteSingleByBlobKey() throws IOException { BlobServer server = null; BlobClient client = null; BlobStore blobStore = new VoidBlobStore(); @@ -131,10 +133,6 @@ public class BlobServerDeleteTest extends TestLogger { // expected } } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { cleanup(server, client); } @@ -153,16 +151,16 @@ public class BlobServerDeleteTest extends TestLogger { } @Test - public void testDeleteAlreadyDeletedNoJob() { + public void testDeleteAlreadyDeletedNoJob() throws IOException { testDeleteAlreadyDeleted(null); } @Test - public void testDeleteAlreadyDeletedForJob() { + public void testDeleteAlreadyDeletedForJob() throws IOException { testDeleteAlreadyDeleted(new JobID()); } - private void testDeleteAlreadyDeleted(final JobID jobId) { + private void testDeleteAlreadyDeleted(final JobID jobId) throws IOException { BlobServer server = null; BlobClient client = null; BlobStore blobStore = new VoidBlobStore(); @@ -201,10 +199,6 @@ public class BlobServerDeleteTest extends TestLogger { server.delete(jobId, key); } } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { cleanup(server, client); } @@ -219,16 +213,16 @@ public class BlobServerDeleteTest extends TestLogger { } @Test - public void testDeleteFailsNoJob() { + public void testDeleteFailsNoJob() throws IOException { testDeleteFails(null); } @Test - public void testDeleteFailsForJob() { + public void testDeleteFailsForJob() throws IOException { testDeleteFails(new JobID()); } - private void testDeleteFails(final JobID jobId) { + private void testDeleteFails(final JobID jobId) throws IOException { assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. BlobServer server = null; @@ -275,9 +269,6 @@ public class BlobServerDeleteTest extends TestLogger { } else { server.getFile(jobId, key); } - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); } finally { if (blobFile != null && directory != null) { //noinspection ResultOfMethodCallIgnored @@ -290,6 +281,64 @@ public class BlobServerDeleteTest extends TestLogger { } /** + * Tests that {@link BlobServer} cleans up after calling {@link BlobServer#cleanupJob(JobID)}. + */ + @Test + public void testJobCleanup() throws IOException, InterruptedException { + + JobID jobId1 = new JobID(); + List<BlobKey> keys1 = new ArrayList<BlobKey>(); + JobID jobId2 = new JobID(); + List<BlobKey> keys2 = new ArrayList<BlobKey>(); + BlobServer server = null; + + final byte[] buf = new byte[128]; + + try { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + + server = new BlobServer(config, new VoidBlobStore()); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + BlobClient bc = new BlobClient(serverAddress, config); + + keys1.add(bc.put(jobId1, buf)); + keys2.add(bc.put(jobId2, buf)); + assertEquals(keys2.get(0), keys1.get(0)); + + buf[0] += 1; + keys1.add(bc.put(jobId1, buf)); + + bc.close(); + + assertEquals(2, checkFilesExist(jobId1, keys1, server, true)); + checkFileCountForJob(2, jobId1, server); + assertEquals(1, checkFilesExist(jobId2, keys2, server, true)); + checkFileCountForJob(1, jobId2, server); + + server.cleanupJob(jobId1); + + checkFileCountForJob(0, jobId1, server); + assertEquals(1, checkFilesExist(jobId2, keys2, server, true)); + checkFileCountForJob(1, jobId2, server); + + server.cleanupJob(jobId2); + + checkFileCountForJob(0, jobId1, server); + checkFileCountForJob(0, jobId2, server); + + // calling a second time should not fail + server.cleanupJob(jobId2); + } + finally { + if (server != null) { + server.close(); + } + } + } + + /** * FLINK-6020 * * Tests that concurrent delete operations don't interfere with each other. http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java index e449aab..a6ac447 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.blob; import org.apache.flink.api.common.JobID; import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Before; @@ -34,7 +35,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; import static org.mockito.Mockito.mock; -public class BlobUtilsTest { +public class BlobUtilsTest extends TestLogger { private final static String CANNOT_CREATE_THIS = "cannot-create-this"; http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java index ec1bbd8..c58e3a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.FailingBlockingInvokable; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -190,26 +191,4 @@ public class CoordinatorShutdownTest extends TestLogger { } } - public static class FailingBlockingInvokable extends AbstractInvokable { - private static boolean blocking = true; - private static final Object lock = new Object(); - - @Override - public void invoke() throws Exception { - while (blocking) { - synchronized (lock) { - lock.wait(); - } - } - throw new RuntimeException("This exception is expected."); - } - - public static void unblock() { - blocking = false; - - synchronized (lock) { - lock.notifyAll(); - } - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 3814684..4237327 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; -import org.apache.flink.runtime.blob.BlobService; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -137,7 +136,7 @@ public class DispatcherTest extends TestLogger { Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, - BlobService blobService, + BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, OnCompletionActions onCompleteActions, http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index b43a307..a4b48e8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -18,24 +18,22 @@ package org.apache.flink.runtime.execution.librarycache; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobServer; -import org.apache.flink.runtime.blob.BlobService; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.api.common.JobID; import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import static org.junit.Assert.*; -import static org.junit.Assume.assumeTrue; - import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -45,7 +43,19 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -public class BlobLibraryCacheManagerTest { +import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob; +import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; + +/** + * Tests for {@link BlobLibraryCacheManager}. + */ +public class BlobLibraryCacheManagerTest extends TestLogger { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -57,10 +67,13 @@ public class BlobLibraryCacheManagerTest { @Test public void testLibraryCacheManagerJobCleanup() throws IOException, InterruptedException { - JobID jid = new JobID(); - List<BlobKey> keys = new ArrayList<BlobKey>(); + JobID jobId1 = new JobID(); + JobID jobId2 = new JobID(); + List<BlobKey> keys1 = new ArrayList<>(); + List<BlobKey> keys2 = new ArrayList<>(); BlobServer server = null; - BlobLibraryCacheManager libraryCacheManager = null; + BlobCache cache = null; + BlobLibraryCacheManager libCache = null; final byte[] buf = new byte[128]; @@ -68,122 +81,231 @@ public class BlobLibraryCacheManagerTest { Configuration config = new Configuration(); config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L); server = new BlobServer(config, new VoidBlobStore()); - InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort()); - BlobClient bc = new BlobClient(blobSocketAddress, config); - - // TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager - JobID jobId = null; + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + BlobClient bc = new BlobClient(serverAddress, config); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); - keys.add(bc.put(jobId, buf)); + keys1.add(bc.put(jobId1, buf)); buf[0] += 1; - keys.add(bc.put(jobId, buf)); + keys1.add(bc.put(jobId1, buf)); + keys2.add(bc.put(jobId2, buf)); bc.close(); - long cleanupInterval = 1000L; - libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval); - libraryCacheManager.registerJob(jid, keys, Collections.<URL>emptyList()); + libCache = new BlobLibraryCacheManager(cache); + cache.registerJob(jobId1); + cache.registerJob(jobId2); + + assertEquals(0, libCache.getNumberOfManagedJobs()); + assertEquals(0, libCache.getNumberOfReferenceHolders(jobId1)); + checkFileCountForJob(2, jobId1, server); + checkFileCountForJob(0, jobId1, cache); + checkFileCountForJob(1, jobId2, server); + checkFileCountForJob(0, jobId2, cache); + + libCache.registerJob(jobId1, keys1, Collections.<URL>emptyList()); + ClassLoader classLoader1 = libCache.getClassLoader(jobId1); + + assertEquals(1, libCache.getNumberOfManagedJobs()); + assertEquals(1, libCache.getNumberOfReferenceHolders(jobId1)); + assertEquals(0, libCache.getNumberOfReferenceHolders(jobId2)); + assertEquals(2, checkFilesExist(jobId1, keys1, cache, true)); + checkFileCountForJob(2, jobId1, server); + checkFileCountForJob(2, jobId1, cache); + assertEquals(0, checkFilesExist(jobId2, keys2, cache, false)); + checkFileCountForJob(1, jobId2, server); + checkFileCountForJob(0, jobId2, cache); + + libCache.registerJob(jobId2, keys2, Collections.<URL>emptyList()); + ClassLoader classLoader2 = libCache.getClassLoader(jobId2); + assertNotEquals(classLoader1, classLoader2); - assertEquals(2, checkFilesExist(jobId, keys, server, true)); - assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries()); - assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid)); - - libraryCacheManager.unregisterJob(jid); - - // because we cannot guarantee that there are not thread races in the build system, we - // loop for a certain while until the references disappear - { - long deadline = System.currentTimeMillis() + 30000; - do { - Thread.sleep(500); - } - while (libraryCacheManager.getNumberOfCachedLibraries() > 0 && - System.currentTimeMillis() < deadline); + try { + libCache.registerJob(jobId2, keys1, Collections.<URL>emptyList()); + fail("Should fail with an IllegalStateException"); + } + catch (IllegalStateException e) { + // that's what we want } - - // this fails if we exited via a timeout - assertEquals(0, libraryCacheManager.getNumberOfCachedLibraries()); - assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid)); - - // the blob cache should no longer contain the files - assertEquals(0, checkFilesExist(jobId, keys, server, false)); try { - if (jobId == null) { - server.getFile(keys.get(0)); - } else { - server.getFile(jobId, keys.get(0)); - } - fail("BLOB should have been deleted"); - } catch (IOException e) { - // expected + libCache.registerJob( + jobId2, keys2, + Collections.singletonList(new URL("file:///tmp/does-not-exist"))); + fail("Should fail with an IllegalStateException"); } - try { - if (jobId == null) { - server.getFile(keys.get(1)); - } else { - server.getFile(jobId, keys.get(1)); - } - fail("BLOB should have been deleted"); - } catch (IOException e) { - // expected + catch (IllegalStateException e) { + // that's what we want } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + + assertEquals(2, libCache.getNumberOfManagedJobs()); + assertEquals(1, libCache.getNumberOfReferenceHolders(jobId1)); + assertEquals(1, libCache.getNumberOfReferenceHolders(jobId2)); + assertEquals(2, checkFilesExist(jobId1, keys1, cache, true)); + checkFileCountForJob(2, jobId1, server); + checkFileCountForJob(2, jobId1, cache); + assertEquals(1, checkFilesExist(jobId2, keys2, cache, true)); + checkFileCountForJob(1, jobId2, server); + checkFileCountForJob(1, jobId2, cache); + + libCache.unregisterJob(jobId1); + + assertEquals(1, libCache.getNumberOfManagedJobs()); + assertEquals(0, libCache.getNumberOfReferenceHolders(jobId1)); + assertEquals(1, libCache.getNumberOfReferenceHolders(jobId2)); + assertEquals(2, checkFilesExist(jobId1, keys1, cache, true)); + checkFileCountForJob(2, jobId1, server); + checkFileCountForJob(2, jobId1, cache); + assertEquals(1, checkFilesExist(jobId2, keys2, cache, true)); + checkFileCountForJob(1, jobId2, server); + checkFileCountForJob(1, jobId2, cache); + + libCache.unregisterJob(jobId2); + + assertEquals(0, libCache.getNumberOfManagedJobs()); + assertEquals(0, libCache.getNumberOfReferenceHolders(jobId1)); + assertEquals(0, libCache.getNumberOfReferenceHolders(jobId2)); + assertEquals(2, checkFilesExist(jobId1, keys1, cache, true)); + checkFileCountForJob(2, jobId1, server); + checkFileCountForJob(2, jobId1, cache); + assertEquals(1, checkFilesExist(jobId2, keys2, cache, true)); + checkFileCountForJob(1, jobId2, server); + checkFileCountForJob(1, jobId2, cache); + + // only BlobCache#releaseJob() calls clean up files (tested in BlobCacheCleanupTest etc. } finally { - if (server != null) { - server.close(); + if (libCache != null) { + libCache.shutdown(); } - if (libraryCacheManager != null) { - try { - libraryCacheManager.shutdown(); - } - catch (IOException e) { - e.printStackTrace(); - } + // should have been closed by the libraryCacheManager, but just in case + if (cache != null) { + cache.close(); + } + + if (server != null) { + server.close(); } } } /** - * Checks how many of the files given by blob keys are accessible. - * - * @param keys - * blob keys to check - * @param blobService - * BLOB store to use - * @param doThrow - * whether exceptions should be ignored (<tt>false</tt>), or throws (<tt>true</tt>) - * - * @return number of files we were able to retrieve via {@link BlobService#getFile} + * Tests that the {@link BlobLibraryCacheManager} cleans up after calling {@link + * BlobLibraryCacheManager#unregisterTask(JobID, ExecutionAttemptID)}. */ - private static int checkFilesExist( - JobID jobId, List<BlobKey> keys, BlobService blobService, boolean doThrow) - throws IOException { - int numFiles = 0; + @Test + public void testLibraryCacheManagerTaskCleanup() throws IOException, InterruptedException { + + JobID jobId = new JobID(); + ExecutionAttemptID attempt1 = new ExecutionAttemptID(); + ExecutionAttemptID attempt2 = new ExecutionAttemptID(); + List<BlobKey> keys = new ArrayList<>(); + BlobServer server = null; + BlobCache cache = null; + BlobLibraryCacheManager libCache = null; + + final byte[] buf = new byte[128]; + + try { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L); + + server = new BlobServer(config, new VoidBlobStore()); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + BlobClient bc = new BlobClient(serverAddress, config); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); + + keys.add(bc.put(jobId, buf)); + buf[0] += 1; + keys.add(bc.put(jobId, buf)); + + bc.close(); + + libCache = new BlobLibraryCacheManager(cache); + cache.registerJob(jobId); + + assertEquals(0, libCache.getNumberOfManagedJobs()); + assertEquals(0, libCache.getNumberOfReferenceHolders(jobId)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); + + libCache.registerTask(jobId, attempt1, keys, Collections.<URL>emptyList()); + ClassLoader classLoader1 = libCache.getClassLoader(jobId); + + assertEquals(1, libCache.getNumberOfManagedJobs()); + assertEquals(1, libCache.getNumberOfReferenceHolders(jobId)); + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + libCache.registerTask(jobId, attempt2, keys, Collections.<URL>emptyList()); + ClassLoader classLoader2 = libCache.getClassLoader(jobId); + assertEquals(classLoader1, classLoader2); - for (BlobKey key : keys) { try { - if (jobId == null) { - blobService.getFile(key); - } else { - blobService.getFile(jobId, key); - } - ++numFiles; - } catch (IOException e) { - if (doThrow) { - throw e; - } + libCache.registerTask( + jobId, new ExecutionAttemptID(), Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList()); + fail("Should fail with an IllegalStateException"); + } + catch (IllegalStateException e) { + // that's what we want + } + + try { + libCache.registerTask( + jobId, new ExecutionAttemptID(), keys, + Collections.singletonList(new URL("file:///tmp/does-not-exist"))); + fail("Should fail with an IllegalStateException"); + } + catch (IllegalStateException e) { + // that's what we want } + + assertEquals(1, libCache.getNumberOfManagedJobs()); + assertEquals(2, libCache.getNumberOfReferenceHolders(jobId)); + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + libCache.unregisterTask(jobId, attempt1); + + assertEquals(1, libCache.getNumberOfManagedJobs()); + assertEquals(1, libCache.getNumberOfReferenceHolders(jobId)); + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + libCache.unregisterTask(jobId, attempt2); + + assertEquals(0, libCache.getNumberOfManagedJobs()); + assertEquals(0, libCache.getNumberOfReferenceHolders(jobId)); + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // only BlobCache#releaseJob() calls clean up files (tested in BlobCacheCleanupTest etc. } + finally { + if (libCache != null) { + libCache.shutdown(); + } + + // should have been closed by the libraryCacheManager, but just in case + if (cache != null) { + cache.close(); + } - return numFiles; + if (server != null) { + server.close(); + } + } } /** @@ -191,14 +313,14 @@ public class BlobLibraryCacheManagerTest { * BlobLibraryCacheManager#unregisterTask(JobID, ExecutionAttemptID)}. */ @Test - public void testLibraryCacheManagerTaskCleanup() throws IOException, InterruptedException { + public void testLibraryCacheManagerMixedJobTaskCleanup() throws IOException, InterruptedException { - JobID jid = new JobID(); - ExecutionAttemptID executionId1 = new ExecutionAttemptID(); - ExecutionAttemptID executionId2 = new ExecutionAttemptID(); - List<BlobKey> keys = new ArrayList<BlobKey>(); + JobID jobId = new JobID(); + ExecutionAttemptID attempt1 = new ExecutionAttemptID(); + List<BlobKey> keys = new ArrayList<>(); BlobServer server = null; - BlobLibraryCacheManager libraryCacheManager = null; + BlobCache cache = null; + BlobLibraryCacheManager libCache = null; final byte[] buf = new byte[128]; @@ -206,67 +328,96 @@ public class BlobLibraryCacheManagerTest { Configuration config = new Configuration(); config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L); server = new BlobServer(config, new VoidBlobStore()); - InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort()); - BlobClient bc = new BlobClient(blobSocketAddress, config); - - // TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager -// JobID jobId = new JobID(); - JobID jobId = null; + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + BlobClient bc = new BlobClient(serverAddress, config); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); keys.add(bc.put(jobId, buf)); buf[0] += 1; keys.add(bc.put(jobId, buf)); - long cleanupInterval = 1000L; - libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval); - libraryCacheManager.registerTask(jid, executionId1, keys, Collections.<URL>emptyList()); - libraryCacheManager.registerTask(jid, executionId2, keys, Collections.<URL>emptyList()); + bc.close(); - assertEquals(2, checkFilesExist(jobId, keys, server, true)); - assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries()); - assertEquals(2, libraryCacheManager.getNumberOfReferenceHolders(jid)); + libCache = new BlobLibraryCacheManager(cache); + cache.registerJob(jobId); - libraryCacheManager.unregisterTask(jid, executionId1); + assertEquals(0, libCache.getNumberOfManagedJobs()); + assertEquals(0, libCache.getNumberOfReferenceHolders(jobId)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); - assertEquals(2, checkFilesExist(jobId, keys, server, true)); - assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries()); - assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid)); + libCache.registerJob(jobId, keys, Collections.<URL>emptyList()); + ClassLoader classLoader1 = libCache.getClassLoader(jobId); - libraryCacheManager.unregisterTask(jid, executionId2); + assertEquals(1, libCache.getNumberOfManagedJobs()); + assertEquals(1, libCache.getNumberOfReferenceHolders(jobId)); + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); - // because we cannot guarantee that there are not thread races in the build system, we - // loop for a certain while until the references disappear - { - long deadline = System.currentTimeMillis() + 30000; - do { - Thread.sleep(100); - } - while (libraryCacheManager.getNumberOfCachedLibraries() > 0 && - System.currentTimeMillis() < deadline); + libCache.registerTask(jobId, attempt1, keys, Collections.<URL>emptyList()); + ClassLoader classLoader2 = libCache.getClassLoader(jobId); + assertEquals(classLoader1, classLoader2); + + try { + libCache.registerTask( + jobId, new ExecutionAttemptID(), Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList()); + fail("Should fail with an IllegalStateException"); + } + catch (IllegalStateException e) { + // that's what we want } - // this fails if we exited via a timeout - assertEquals(0, libraryCacheManager.getNumberOfCachedLibraries()); - assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid)); + try { + libCache.registerTask( + jobId, new ExecutionAttemptID(), keys, + Collections.singletonList(new URL("file:///tmp/does-not-exist"))); + fail("Should fail with an IllegalStateException"); + } + catch (IllegalStateException e) { + // that's what we want + } - // the blob cache should no longer contain the files - assertEquals(0, checkFilesExist(jobId, keys, server, false)); + assertEquals(1, libCache.getNumberOfManagedJobs()); + assertEquals(2, libCache.getNumberOfReferenceHolders(jobId)); + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); - bc.close(); - } finally { - if (server != null) { - server.close(); + libCache.unregisterJob(jobId); + + assertEquals(1, libCache.getNumberOfManagedJobs()); + assertEquals(1, libCache.getNumberOfReferenceHolders(jobId)); + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + libCache.unregisterTask(jobId, attempt1); + + assertEquals(0, libCache.getNumberOfManagedJobs()); + assertEquals(0, libCache.getNumberOfReferenceHolders(jobId)); + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // only BlobCache#releaseJob() calls clean up files (tested in BlobCacheCleanupTest etc. + } + finally { + if (libCache != null) { + libCache.shutdown(); } - if (libraryCacheManager != null) { - try { - libraryCacheManager.shutdown(); - } - catch (IOException e) { - e.printStackTrace(); - } + // should have been closed by the libraryCacheManager, but just in case + if (cache != null) { + cache.close(); + } + + if (server != null) { + server.close(); } } } @@ -275,75 +426,103 @@ public class BlobLibraryCacheManagerTest { public void testRegisterAndDownload() throws IOException { assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. + JobID jobId = new JobID(); BlobServer server = null; BlobCache cache = null; + BlobLibraryCacheManager libCache = null; File cacheDir = null; try { // create the blob transfer services Configuration config = new Configuration(); config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1_000_000L); server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); cache = new BlobCache(serverAddress, config, new VoidBlobStore()); - // TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager - JobID jobId = null; - // upload some meaningless data to the server BlobClient uploader = new BlobClient(serverAddress, config); BlobKey dataKey1 = uploader.put(jobId, new byte[]{1, 2, 3, 4, 5, 6, 7, 8}); BlobKey dataKey2 = uploader.put(jobId, new byte[]{11, 12, 13, 14, 15, 16, 17, 18}); uploader.close(); - BlobLibraryCacheManager libCache = new BlobLibraryCacheManager(cache, 1000000000L); - - assertEquals(0, libCache.getNumberOfCachedLibraries()); + libCache = new BlobLibraryCacheManager(cache); + assertEquals(0, libCache.getNumberOfManagedJobs()); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); // first try to access a non-existing entry + assertEquals(0, libCache.getNumberOfReferenceHolders(new JobID())); try { libCache.getClassLoader(new JobID()); fail("Should fail with an IllegalStateException"); } catch (IllegalStateException e) { - // that#s what we want + // that's what we want } - // now register some BLOBs as libraries + // register some BLOBs as libraries { - JobID jid = new JobID(); - ExecutionAttemptID executionId = new ExecutionAttemptID(); Collection<BlobKey> keys = Collections.singleton(dataKey1); - libCache.registerTask(jid, executionId, keys, Collections.<URL>emptyList()); - assertEquals(1, libCache.getNumberOfReferenceHolders(jid)); - assertEquals(1, libCache.getNumberOfCachedLibraries()); - assertNotNull(libCache.getClassLoader(jid)); - - // un-register them again - libCache.unregisterTask(jid, executionId); + cache.registerJob(jobId); + ExecutionAttemptID executionId = new ExecutionAttemptID(); + libCache.registerTask(jobId, executionId, keys, Collections.<URL>emptyList()); + ClassLoader classLoader1 = libCache.getClassLoader(jobId); + assertEquals(1, libCache.getNumberOfManagedJobs()); + assertEquals(1, libCache.getNumberOfReferenceHolders(jobId)); + assertEquals(1, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(1, jobId, cache); + assertNotNull(libCache.getClassLoader(jobId)); + + libCache.registerJob(jobId, keys, Collections.<URL>emptyList()); + ClassLoader classLoader2 = libCache.getClassLoader(jobId); + assertEquals(classLoader1, classLoader2); + assertEquals(1, libCache.getNumberOfManagedJobs()); + assertEquals(2, libCache.getNumberOfReferenceHolders(jobId)); + assertEquals(1, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(1, jobId, cache); + assertNotNull(libCache.getClassLoader(jobId)); + + // un-register the job + libCache.unregisterJob(jobId); + // still one task + assertEquals(1, libCache.getNumberOfManagedJobs()); + assertEquals(1, libCache.getNumberOfReferenceHolders(jobId)); + assertEquals(1, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(1, jobId, cache); + + // unregister the task registration + libCache.unregisterTask(jobId, executionId); + assertEquals(0, libCache.getNumberOfManagedJobs()); + assertEquals(0, libCache.getNumberOfReferenceHolders(jobId)); + // changing the libCache registration does not influence the BLOB stores... + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(1, jobId, cache); // Don't fail if called again - libCache.unregisterTask(jid, executionId); + libCache.unregisterJob(jobId); + assertEquals(0, libCache.getNumberOfManagedJobs()); + assertEquals(0, libCache.getNumberOfReferenceHolders(jobId)); - assertEquals(0, libCache.getNumberOfReferenceHolders(jid)); + libCache.unregisterTask(jobId, executionId); + assertEquals(0, libCache.getNumberOfManagedJobs()); + assertEquals(0, libCache.getNumberOfReferenceHolders(jobId)); - // library is still cached (but not associated with job any more) - assertEquals(1, libCache.getNumberOfCachedLibraries()); + cache.releaseJob(jobId); - // should not be able to access the classloader any more - try { - libCache.getClassLoader(jid); - fail("Should fail with an IllegalStateException"); - } - catch (IllegalStateException e) { - // that's what we want - } + // library is still cached (but not associated with job any more) + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(1, jobId, cache); } // see BlobUtils for the directory layout - cacheDir = new File(cache.getStorageDir(), "no_job"); + cacheDir = cache.getStorageLocation(jobId, new BlobKey()).getParentFile(); assertTrue(cacheDir.exists()); // make sure no further blobs can be downloaded by removing the write @@ -352,12 +531,14 @@ public class BlobLibraryCacheManagerTest { // since we cannot download this library any more, this call should fail try { - libCache.registerTask(new JobID(), new ExecutionAttemptID(), Collections.singleton(dataKey2), - Collections.<URL>emptyList()); + cache.registerJob(jobId); + libCache.registerTask(jobId, new ExecutionAttemptID(), Collections.singleton(dataKey2), + Collections.<URL>emptyList()); fail("This should fail with an IOException"); } catch (IOException e) { // splendid! + cache.releaseJob(jobId); } } finally { if (cacheDir != null) { @@ -368,6 +549,9 @@ public class BlobLibraryCacheManagerTest { if (cache != null) { cache.close(); } + if (libCache != null) { + libCache.shutdown(); + } if (server != null) { server.close(); } http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index 2f6738d..e52310e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.blob.BlobUtils; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.TestLogger; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -49,6 +50,9 @@ import java.util.Random; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +/** + * Integration test for {@link BlobLibraryCacheManager}. + */ public class BlobLibraryCacheRecoveryITCase extends TestLogger { @Rule @@ -65,7 +69,6 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { InetSocketAddress[] serverAddress = new InetSocketAddress[2]; BlobLibraryCacheManager[] libServer = new BlobLibraryCacheManager[2]; BlobCache cache = null; - BlobLibraryCacheManager libCache = null; BlobStoreService blobStoreService = null; Configuration config = new Configuration(); @@ -75,6 +78,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { temporaryFolder.newFolder().getAbsolutePath()); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath()); + config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L); try { blobStoreService = BlobUtils.createBlobStoreFromConfig(config); @@ -82,7 +86,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { for (int i = 0; i < server.length; i++) { server[i] = new BlobServer(config, blobStoreService); serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); - libServer[i] = new BlobLibraryCacheManager(server[i], 3600 * 1000); + libServer[i] = new BlobLibraryCacheManager(server[i]); } // Random data @@ -92,25 +96,22 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { List<BlobKey> keys = new ArrayList<>(2); JobID jobId = new JobID(); - // TODO: replace+adapt by jobId after adapting the BlobLibraryCacheManager - JobID blobJobId = null; // Upload some data (libraries) try (BlobClient client = new BlobClient(serverAddress[0], config)) { - keys.add(client.put(blobJobId, expected)); // Request 1 - keys.add(client.put(blobJobId, expected, 32, 256)); // Request 2 + keys.add(client.put(jobId, expected)); // Request 1 + keys.add(client.put(jobId, expected, 32, 256)); // Request 2 } // The cache cache = new BlobCache(serverAddress[0], config, blobStoreService); - libCache = new BlobLibraryCacheManager(cache, 3600 * 1000); // Register uploaded libraries ExecutionAttemptID executionId = new ExecutionAttemptID(); libServer[0].registerTask(jobId, executionId, keys, Collections.<URL>emptyList()); // Verify key 1 - File f = cache.getFile(keys.get(0)); + File f = cache.getFile(jobId, keys.get(0)); assertEquals(expected.length, f.length()); try (FileInputStream fis = new FileInputStream(f)) { @@ -123,13 +124,11 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { // Shutdown cache and start with other server cache.close(); - libCache.shutdown(); cache = new BlobCache(serverAddress[1], config, blobStoreService); - libCache = new BlobLibraryCacheManager(cache, 3600 * 1000); // Verify key 1 - f = cache.getFile(keys.get(0)); + f = cache.getFile(jobId, keys.get(0)); assertEquals(expected.length, f.length()); try (FileInputStream fis = new FileInputStream(f)) { @@ -141,7 +140,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { } // Verify key 2 - f = cache.getFile(keys.get(1)); + f = cache.getFile(jobId, keys.get(1)); assertEquals(256, f.length()); try (FileInputStream fis = new FileInputStream(f)) { @@ -154,8 +153,8 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { // Remove blobs again try (BlobClient client = new BlobClient(serverAddress[1], config)) { - client.delete(keys.get(0)); - client.delete(keys.get(1)); + client.delete(jobId, keys.get(0)); + client.delete(jobId, keys.get(1)); } // Verify everything is clean below recoveryDir/<cluster_id> @@ -167,6 +166,11 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); } finally { + for (BlobLibraryCacheManager s : libServer) { + if (s != null) { + s.shutdown(); + } + } for (BlobServer s : server) { if (s != null) { s.close(); @@ -177,10 +181,6 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { cache.close(); } - if (libCache != null) { - libCache.shutdown(); - } - if (blobStoreService != null) { blobStoreService.closeAndCleanupAllData(); } http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java new file mode 100644 index 0000000..b2b455b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.FailingBlockingInvokable; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; + +import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +/** + * Small test to check that the {@link org.apache.flink.runtime.blob.BlobServer} cleanup is executed + * after job termination. + */ +public class JobManagerCleanupITCase extends TestLogger { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** + * Specifies which test case to run in {@link #testBlobServerCleanup(TestCase)}. + */ + private enum TestCase { + JOB_FINISHES_SUCESSFULLY, + JOB_IS_CANCELLED, + JOB_FAILS, + JOB_SUBMISSION_FAILS + } + + /** + * Test cleanup for a job that finishes ordinarily. + */ + @Test + public void testBlobServerCleanupFinishedJob() throws IOException { + testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY); + } + + /** + * Test cleanup for a job which is cancelled after submission. + */ + @Test + public void testBlobServerCleanupCancelledJob() throws IOException { + testBlobServerCleanup(TestCase.JOB_IS_CANCELLED); + } + + /** + * Test cleanup for a job that fails (first a task fails, then the job recovers, then the whole + * job fails due to a limited restart policy). + */ + @Test + public void testBlobServerCleanupFailedJob() throws IOException { + testBlobServerCleanup(TestCase.JOB_FAILS); + } + + /** + * Test cleanup for a job that fails job submission (emulated by an additional BLOB not being + * present). + */ + @Test + public void testBlobServerCleanupFailedSubmission() throws IOException { + testBlobServerCleanup(TestCase.JOB_SUBMISSION_FAILS); + } + + private void testBlobServerCleanup(final TestCase testCase) throws IOException { + final int num_tasks = 2; + final File blobBaseDir = tmpFolder.newFolder(); + + new JavaTestKit(system) {{ + new Within(duration("30 seconds")) { + @Override + protected void run() { + // Setup + + TestingCluster cluster = null; + BlobClient bc = null; + + try { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT()); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath()); + + config.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay"); + config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "1 s"); + // BLOBs are deleted from BlobCache between 1s and 2s after last reference + // -> the BlobCache may still have the BLOB or not (let's test both cases randomly) + config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L); + + cluster = new TestingCluster(config); + cluster.start(); + + final ActorGateway jobManagerGateway = cluster.getLeaderGateway( + TestingUtils.TESTING_DURATION()); + + // we can set the leader session ID to None because we don't use this gateway to send messages + final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), + HighAvailabilityServices.DEFAULT_LEADER_ID); + + // Create a task + + JobVertex source = new JobVertex("Source"); + if (testCase == TestCase.JOB_FAILS || testCase == TestCase.JOB_IS_CANCELLED) { + source.setInvokableClass(FailingBlockingInvokable.class); + } else { + source.setInvokableClass(NoOpInvokable.class); + } + source.setParallelism(num_tasks); + + JobGraph jobGraph = new JobGraph("BlobCleanupTest", source); + final JobID jid = jobGraph.getJobID(); + + // request the blob port from the job manager + Future<Object> future = jobManagerGateway + .ask(JobManagerMessages.getRequestBlobManagerPort(), remaining()); + int blobPort = (Integer) Await.result(future, remaining()); + + // upload a blob + BlobKey key1; + bc = new BlobClient(new InetSocketAddress("localhost", blobPort), + config); + try { + key1 = bc.put(jid, new byte[10]); + } finally { + bc.close(); + } + jobGraph.addBlob(key1); + + if (testCase == TestCase.JOB_SUBMISSION_FAILS) { + // add an invalid key so that the submission fails + jobGraph.addBlob(new BlobKey()); + } + + // Submit the job and wait for all vertices to be running + jobManagerGateway.tell( + new JobManagerMessages.SubmitJob( + jobGraph, + ListeningBehaviour.EXECUTION_RESULT), + testActorGateway); + if (testCase == TestCase.JOB_SUBMISSION_FAILS) { + expectMsgClass(JobManagerMessages.JobResultFailure.class); + } else { + expectMsgClass(JobManagerMessages.JobSubmitSuccess.class); + + if (testCase == TestCase.JOB_FAILS) { + // fail a task so that the job is going to be recovered (we actually do not + // need the blocking part of the invokable and can start throwing right away) + FailingBlockingInvokable.unblock(); + + // job will get restarted, BlobCache may re-download the BLOB if already deleted + // then the tasks will fail again and the restart strategy will finalise the job + + expectMsgClass(JobManagerMessages.JobResultFailure.class); + } else if (testCase == TestCase.JOB_IS_CANCELLED) { + jobManagerGateway.tell( + new JobManagerMessages.CancelJob(jid), + testActorGateway); + expectMsgClass(JobManagerMessages.CancellationResponse.class); + + // job will be cancelled and everything should be cleaned up + + expectMsgClass(JobManagerMessages.JobResultFailure.class); + } else { + expectMsgClass(JobManagerMessages.JobResultSuccess.class); + } + } + + // both BlobServer and BlobCache should eventually delete all files + + File[] blobDirs = blobBaseDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("blobStore-"); + } + }); + assertNotNull(blobDirs); + for (File blobDir : blobDirs) { + waitForEmptyBlobDir(blobDir, remaining()); + } + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (bc != null) { + try { + bc.close(); + } catch (IOException ignored) { + } + } + if (cluster != null) { + cluster.shutdown(); + } + } + } + }; + }}; + + // after everything has been shut down, the storage directory itself should be empty + assertArrayEquals(new File[] {}, blobBaseDir.listFiles()); + } + + /** + * Waits until the given {@link org.apache.flink.runtime.blob.BlobService} storage directory + * does not contain any job-related folders any more. + * + * @param blobDir + * directory of a {@link org.apache.flink.runtime.blob.BlobServer} or {@link + * org.apache.flink.runtime.blob.BlobCache} + * @param remaining + * remaining time for this test + * + * @see org.apache.flink.runtime.blob.BlobUtils + */ + private static void waitForEmptyBlobDir(File blobDir, FiniteDuration remaining) + throws InterruptedException { + long deadline = System.currentTimeMillis() + remaining.toMillis(); + String[] blobDirContents; + do { + blobDirContents = blobDir.list(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("job_"); + } + }); + if (blobDirContents == null || blobDirContents.length == 0) { + return; + } + Thread.sleep(100); + } while (System.currentTimeMillis() < deadline); + + fail("Timeout while waiting for " + blobDir.getAbsolutePath() + " to become empty. Current contents: " + Arrays.toString(blobDirContents)); + } +}