http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 23f0a38..933c7a0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -27,7 +28,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -90,6 +90,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -158,6 +159,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
                
flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
                
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
slots);
+               flinkConfiguration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 
3_600L);
 
                try {
                        Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
@@ -179,6 +181,9 @@ public class JobManagerHARecoveryTest extends TestLogger {
 
                        archive = 
system.actorOf(JobManager.getArchiveProps(MemoryArchivist.class, 10, 
Option.<Path>empty()));
 
+                       BlobServer blobServer = new BlobServer(
+                               flinkConfiguration,
+                               
testingHighAvailabilityServices.createBlobStore());
                        Props jobManagerProps = Props.create(
                                TestingJobManager.class,
                                flinkConfiguration,
@@ -186,11 +191,8 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                TestingUtils.defaultExecutor(),
                                instanceManager,
                                scheduler,
-                               new BlobLibraryCacheManager(
-                                       new BlobServer(
-                                               flinkConfiguration,
-                                               
testingHighAvailabilityServices.createBlobStore()),
-                                       3600000L),
+                               blobServer,
+                               new BlobLibraryCacheManager(blobServer),
                                archive,
                                new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
                                timeout,
@@ -353,6 +355,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 
                        final Collection<JobID> recoveredJobs = new 
ArrayList<>(2);
 
+                       BlobServer blobServer = mock(BlobServer.class);
                        Props jobManagerProps = Props.create(
                                TestingFailingHAJobManager.class,
                                flinkConfiguration,
@@ -360,7 +363,8 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                TestingUtils.defaultExecutor(),
                                mock(InstanceManager.class),
                                mock(Scheduler.class),
-                               new 
BlobLibraryCacheManager(mock(BlobService.class), 1 << 20),
+                               blobServer,
+                               new BlobLibraryCacheManager(blobServer),
                                ActorRef.noSender(),
                                new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
                                timeout,
@@ -397,6 +401,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                        Executor ioExecutor,
                        InstanceManager instanceManager,
                        Scheduler scheduler,
+                       BlobServer blobServer,
                        BlobLibraryCacheManager libraryCacheManager,
                        ActorRef archive,
                        RestartStrategyFactory restartStrategyFactory,
@@ -413,6 +418,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                ioExecutor,
                                instanceManager,
                                scheduler,
+                               blobServer,
                                libraryCacheManager,
                                archive,
                                restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 3c75971..6a39293 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -137,14 +137,13 @@ public class JobSubmitTest {
                        // upload two dummy bytes and add their keys to the job 
graph as dependencies
                        BlobKey key1, key2;
                        BlobClient bc = new BlobClient(new 
InetSocketAddress("localhost", blobPort), jmConfig);
-                       // TODO: make use of job-related BLOBs after adapting 
the BlobLibraryCacheManager
-                       JobID jobId = null;
+                       JobID jobId = jg.getJobID();
                        try {
                                key1 = bc.put(jobId, new byte[10]);
                                key2 = bc.put(jobId, new byte[10]);
 
                                // delete one of the blobs to make sure that 
the startup failed
-                               bc.delete(key2);
+                               bc.delete(jobId, key2);
                        }
                        finally {
                                bc.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index df35369..2c17b5a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -92,8 +93,8 @@ public class JobMasterTest extends TestLogger {
 
                final ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
                final HeartbeatServices heartbeatServices = new 
TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, 
scheduledExecutor);
-               final BlobLibraryCacheManager libraryCacheManager = 
mock(BlobLibraryCacheManager.class);
-               when(libraryCacheManager.getBlobServerPort()).thenReturn(1337);
+               BlobServer blobServer = mock(BlobServer.class);
+               when(blobServer.getPort()).thenReturn(1337);
 
                final JobGraph jobGraph = new JobGraph();
 
@@ -106,7 +107,8 @@ public class JobMasterTest extends TestLogger {
                                haServices,
                                heartbeatServices,
                                Executors.newScheduledThreadPool(1),
-                               libraryCacheManager,
+                               blobServer,
+                               mock(BlobLibraryCacheManager.class),
                                mock(RestartStrategyFactory.class),
                                Time.of(10, TimeUnit.SECONDS),
                                null,
@@ -204,6 +206,7 @@ public class JobMasterTest extends TestLogger {
                                haServices,
                                heartbeatServices,
                                Executors.newScheduledThreadPool(1),
+                               mock(BlobServer.class),
                                mock(BlobLibraryCacheManager.class),
                                mock(RestartStrategyFactory.class),
                                Time.of(10, TimeUnit.SECONDS),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 70800e5..230ca91 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -25,9 +25,10 @@ import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
-
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
@@ -47,13 +48,11 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -178,6 +177,9 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                SubmittedJobGraphStore submittedJobGraphStore = new 
StandaloneSubmittedJobGraphStore();
                CheckpointRecoveryFactory checkpointRecoveryFactory = new 
StandaloneCheckpointRecoveryFactory();
 
+               configuration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+               BlobServer blobServer = new BlobServer(configuration, new 
VoidBlobStore());
                return Props.create(
                        TestingJobManager.class,
                        configuration,
@@ -185,7 +187,8 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                        TestingUtils.defaultExecutor(),
                        new InstanceManager(),
                        new Scheduler(TestingUtils.defaultExecutionContext()),
-                       new BlobLibraryCacheManager(new 
BlobServer(configuration, new VoidBlobStore()), 10L),
+                       blobServer,
+                       new BlobLibraryCacheManager(blobServer),
                        ActorRef.noSender(),
                        new NoRestartStrategy.NoRestartStrategyFactory(),
                        AkkaUtils.getDefaultTimeoutAsFiniteDuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 43ff60b..6842bee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -668,7 +669,7 @@ public class TaskExecutorTest extends TestLogger {
                                
Collections.<InputGateDeploymentDescriptor>emptyList());
 
                final LibraryCacheManager libraryCacheManager = 
mock(LibraryCacheManager.class);
-               
when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
+               
when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
 
                final JobManagerConnection jobManagerConnection = new 
JobManagerConnection(
                        jobId,
@@ -677,6 +678,7 @@ public class TaskExecutorTest extends TestLogger {
                        jobManagerLeaderId,
                        mock(TaskManagerActions.class),
                        mock(CheckpointResponder.class),
+                       mock(BlobCache.class),
                        libraryCacheManager,
                        mock(ResultPartitionConsumableNotifier.class),
                        mock(PartitionProducerStateChecker.class));
@@ -1191,6 +1193,7 @@ public class TaskExecutorTest extends TestLogger {
                        jobManagerLeaderId,
                        mock(TaskManagerActions.class),
                        mock(CheckpointResponder.class),
+                       mock(BlobCache.class),
                        libraryCacheManager,
                        mock(ResultPartitionConsumableNotifier.class),
                        mock(PartitionProducerStateChecker.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 085a386..392dc29 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -145,6 +146,7 @@ public class TaskAsyncCallTest {
        }
        
        private static Task createTask() throws Exception {
+               BlobCache blobCache = mock(BlobCache.class);
                LibraryCacheManager libCache = mock(LibraryCacheManager.class);
                
when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
                
@@ -195,6 +197,7 @@ public class TaskAsyncCallTest {
                        mock(TaskManagerActions.class),
                        mock(InputSplitProvider.class),
                        mock(CheckpointResponder.class),
+                       blobCache,
                        libCache,
                        mock(FileCache.class),
                        new TestingTaskManagerRuntimeInfo(),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 1ebd4ad..ac0df36 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -98,6 +99,7 @@ public class TaskStopTest {
                        mock(TaskManagerActions.class),
                        mock(InputSplitProvider.class),
                        mock(CheckpointResponder.class),
+                       mock(BlobCache.class),
                        mock(LibraryCacheManager.class),
                        mock(FileCache.class),
                        tmRuntimeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index ba3e820..d4cd0cf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -227,7 +228,8 @@ public class TaskTest extends TestLogger {
        @Test
        public void testLibraryCacheRegistrationFailed() {
                try {
-                       Task task = createTask(TestInvokableCorrect.class, 
mock(LibraryCacheManager.class));
+                       Task task = createTask(TestInvokableCorrect.class, 
mock(BlobCache.class),
+                               mock(LibraryCacheManager.class));
 
                        // task should be new and perfect
                        assertEquals(ExecutionState.CREATED, 
task.getExecutionState());
@@ -260,6 +262,7 @@ public class TaskTest extends TestLogger {
        @Test
        public void testExecutionFailsInNetworkRegistration() {
                try {
+                       BlobCache blobCache = mock(BlobCache.class);
                        // mock a working library cache
                        LibraryCacheManager libCache = 
mock(LibraryCacheManager.class);
                        
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
@@ -274,7 +277,7 @@ public class TaskTest extends TestLogger {
                        
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                        doThrow(new 
RuntimeException("buffers")).when(network).registerTask(any(Task.class));
 
-                       Task task = createTask(TestInvokableCorrect.class, 
libCache, network, consumableNotifier, partitionProducerStateChecker, executor);
+                       Task task = createTask(TestInvokableCorrect.class, 
blobCache, libCache, network, consumableNotifier, 
partitionProducerStateChecker, executor);
 
                        task.registerExecutionListener(listener);
 
@@ -617,6 +620,7 @@ public class TaskTest extends TestLogger {
                IntermediateDataSetID resultId = new IntermediateDataSetID();
                ResultPartitionID partitionId = new ResultPartitionID();
 
+               BlobCache blobCache = mock(BlobCache.class);
                LibraryCacheManager libCache = mock(LibraryCacheManager.class);
                
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
 
@@ -629,7 +633,7 @@ public class TaskTest extends TestLogger {
                when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                        .thenReturn(mock(TaskKvStateRegistry.class));
 
-               createTask(InvokableBlockingInInvoke.class, libCache, network, 
consumableNotifier, partitionChecker, Executors.directExecutor());
+               createTask(InvokableBlockingInInvoke.class, blobCache, 
libCache, network, consumableNotifier, partitionChecker, 
Executors.directExecutor());
 
                // Test all branches of trigger partition state check
 
@@ -638,7 +642,7 @@ public class TaskTest extends TestLogger {
                        createQueuesAndActors();
 
                        // PartitionProducerDisposedException
-                       Task task = createTask(InvokableBlockingInInvoke.class, 
libCache, network, consumableNotifier, partitionChecker, 
Executors.directExecutor());
+                       Task task = createTask(InvokableBlockingInInvoke.class, 
blobCache, libCache, network, consumableNotifier, partitionChecker, 
Executors.directExecutor());
 
                        CompletableFuture<ExecutionState> promise = new 
CompletableFuture<>();
                        
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), 
eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -654,7 +658,7 @@ public class TaskTest extends TestLogger {
                        createQueuesAndActors();
 
                        // Any other exception
-                       Task task = createTask(InvokableBlockingInInvoke.class, 
libCache, network, consumableNotifier, partitionChecker, 
Executors.directExecutor());
+                       Task task = createTask(InvokableBlockingInInvoke.class, 
blobCache, libCache, network, consumableNotifier, partitionChecker, 
Executors.directExecutor());
 
                        CompletableFuture<ExecutionState> promise = new 
CompletableFuture<>();
                        
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), 
eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -671,7 +675,7 @@ public class TaskTest extends TestLogger {
                        createQueuesAndActors();
 
                        // TimeoutException handled special => retry
-                       Task task = createTask(InvokableBlockingInInvoke.class, 
libCache, network, consumableNotifier, partitionChecker, 
Executors.directExecutor());
+                       Task task = createTask(InvokableBlockingInInvoke.class, 
blobCache, libCache, network, consumableNotifier, partitionChecker, 
Executors.directExecutor());
                        SingleInputGate inputGate = mock(SingleInputGate.class);
                        
when(inputGate.getConsumedResultId()).thenReturn(resultId);
 
@@ -702,7 +706,7 @@ public class TaskTest extends TestLogger {
                        createQueuesAndActors();
 
                        // Success
-                       Task task = createTask(InvokableBlockingInInvoke.class, 
libCache, network, consumableNotifier, partitionChecker, 
Executors.directExecutor());
+                       Task task = createTask(InvokableBlockingInInvoke.class, 
blobCache, libCache, network, consumableNotifier, partitionChecker, 
Executors.directExecutor());
                        SingleInputGate inputGate = mock(SingleInputGate.class);
                        
when(inputGate.getConsumedResultId()).thenReturn(resultId);
 
@@ -882,26 +886,30 @@ public class TaskTest extends TestLogger {
        }
 
        private Task createTask(Class<? extends AbstractInvokable> invokable, 
Configuration config) throws IOException {
+               BlobCache blobCache = mock(BlobCache.class);
                LibraryCacheManager libCache = mock(LibraryCacheManager.class);
                
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-               return createTask(invokable, libCache, config, new 
ExecutionConfig());
+               return createTask(invokable, blobCache,libCache, config, new 
ExecutionConfig());
        }
 
        private Task createTask(Class<? extends AbstractInvokable> invokable, 
Configuration config, ExecutionConfig execConfig) throws IOException {
+               BlobCache blobCache = mock(BlobCache.class);
                LibraryCacheManager libCache = mock(LibraryCacheManager.class);
                
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-               return createTask(invokable, libCache, config, execConfig);
+               return createTask(invokable, blobCache,libCache, config, 
execConfig);
        }
 
        private Task createTask(
                        Class<? extends AbstractInvokable> invokable,
+                       BlobCache blobCache,
                        LibraryCacheManager libCache) throws IOException {
 
-               return createTask(invokable, libCache, new Configuration(), new 
ExecutionConfig());
+               return createTask(invokable, blobCache,libCache, new 
Configuration(), new ExecutionConfig());
        }
 
        private Task createTask(
                        Class<? extends AbstractInvokable> invokable,
+                       BlobCache blobCache,
                        LibraryCacheManager libCache,
                        Configuration config,
                        ExecutionConfig execConfig) throws IOException {
@@ -916,21 +924,23 @@ public class TaskTest extends TestLogger {
                when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
 
-               return createTask(invokable, libCache, network, 
consumableNotifier, partitionProducerStateChecker, executor, config, 
execConfig);
+               return createTask(invokable, blobCache, libCache, network, 
consumableNotifier, partitionProducerStateChecker, executor, config, 
execConfig);
        }
 
        private Task createTask(
                        Class<? extends AbstractInvokable> invokable,
+                       BlobCache blobCache,
                        LibraryCacheManager libCache,
                        NetworkEnvironment networkEnvironment,
                        ResultPartitionConsumableNotifier consumableNotifier,
                        PartitionProducerStateChecker 
partitionProducerStateChecker,
                        Executor executor) throws IOException {
-               return createTask(invokable, libCache, networkEnvironment, 
consumableNotifier, partitionProducerStateChecker, executor, new 
Configuration(), new ExecutionConfig());
+               return createTask(invokable, blobCache, libCache, 
networkEnvironment, consumableNotifier, partitionProducerStateChecker, 
executor, new Configuration(), new ExecutionConfig());
        }
        
        private Task createTask(
                Class<? extends AbstractInvokable> invokable,
+               BlobCache blobCache,
                LibraryCacheManager libCache,
                NetworkEnvironment networkEnvironment,
                ResultPartitionConsumableNotifier consumableNotifier,
@@ -991,6 +1001,7 @@ public class TaskTest extends TestLogger {
                        taskManagerConnection,
                        inputSplitProvider,
                        checkpointResponder,
+                       blobCache,
                        libCache,
                        mock(FileCache.class),
                        new TestingTaskManagerRuntimeInfo(taskManagerConfig),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
new file mode 100644
index 0000000..37c141d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * Task which blocks until the (static) {@link #unblock()} method is called 
and then fails with an
+ * exception.
+ */
+public class FailingBlockingInvokable extends AbstractInvokable {
+       private static volatile boolean blocking = true;
+       private static final Object lock = new Object();
+
+       @Override
+       public void invoke() throws Exception {
+               while (blocking) {
+                       synchronized (lock) {
+                               lock.wait();
+                       }
+               }
+               throw new RuntimeException("This exception is expected.");
+       }
+
+       public static void unblock() {
+               blocking = false;
+
+               synchronized (lock) {
+                       lock.notifyAll();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index c1df5a3..229f1eb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -178,6 +179,7 @@ public class JvmExitOnFatalErrorTest {
                                                new NoOpTaskManagerActions(),
                                                new NoOpInputSplitProvider(),
                                                new NoOpCheckpointResponder(),
+                                               mock(BlobCache.class),
                                                new 
FallbackLibraryCacheManager(),
                                                new 
FileCache(tmInfo.getTmpDirectories()),
                                                tmInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 1b9ee48..95da981 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -264,14 +264,15 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll with Befor
       components._1,
       components._2,
       components._3,
-      ActorRef.noSender,
       components._4,
+      ActorRef.noSender,
       components._5,
+      components._6,
       highAvailabilityServices.getJobManagerLeaderElectionService(
         HighAvailabilityServices.DEFAULT_JOB_ID),
       highAvailabilityServices.getSubmittedJobGraphStore(),
       highAvailabilityServices.getCheckpointRecoveryFactory(),
-      components._8,
+      components._9,
       None)
 
     _system.actorOf(props)

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index e5655bb..87f8088 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -28,6 +28,7 @@ import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.{Configuration, JobManagerOptions}
 import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
 import org.apache.flink.runtime.checkpoint.{CheckpointOptions, 
CheckpointRecoveryFactory}
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -110,6 +111,7 @@ class TestingCluster(
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -127,6 +129,7 @@ class TestingCluster(
       ioExecutor,
       instanceManager,
       scheduler,
+      blobServer,
       libraryCacheManager,
       archive,
       restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index f50a832..8b9ce15 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{Executor, 
ScheduledExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -34,15 +35,16 @@ import org.apache.flink.runtime.metrics.MetricRegistry
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
-/** JobManager implementation extended by testing messages
-  *
-  */
+/**
+ * JobManager implementation extended by testing messages
+ */
 class TestingJobManager(
     flinkConfiguration: Configuration,
     futureExecutor: ScheduledExecutorService,
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -58,6 +60,7 @@ class TestingJobManager(
     ioExecutor,
     instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     archive,
     restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 3b8178b..82642ea 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -156,6 +157,7 @@ public class BlockingCheckpointsTest {
                                mock(TaskManagerActions.class),
                                mock(InputSplitProvider.class),
                                mock(CheckpointResponder.class),
+                               mock(BlobCache.class),
                                new FallbackLibraryCacheManager(),
                                new FileCache(new String[] { 
EnvironmentInformation.getTemporaryFileDirectory() }),
                                new TestingTaskManagerRuntimeInfo(),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 82e4f31..14ae733 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -274,6 +275,7 @@ public class InterruptSensitiveRestoreTest {
                        mock(TaskManagerActions.class),
                        mock(InputSplitProvider.class),
                        mock(CheckpointResponder.class),
+                       mock(BlobCache.class),
                        new FallbackLibraryCacheManager(),
                        new FileCache(new String[] { 
EnvironmentInformation.getTemporaryFileDirectory() }),
                        new TestingTaskManagerRuntimeInfo(),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 702d833..79e9583 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -153,6 +154,7 @@ public class StreamTaskTerminationTest extends TestLogger {
                        mock(TaskManagerActions.class),
                        mock(InputSplitProvider.class),
                        mock(CheckpointResponder.class),
+                       mock(BlobCache.class),
                        new FallbackLibraryCacheManager(),
                        mock(FileCache.class),
                        taskManagerRuntimeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 09e9a1b..08c3207 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -796,6 +797,7 @@ public class StreamTaskTest extends TestLogger {
                        StreamConfig taskConfig,
                        Configuration taskManagerConfig) throws Exception {
 
+               BlobCache blobCache = mock(BlobCache.class);
                LibraryCacheManager libCache = mock(LibraryCacheManager.class);
                
when(libCache.getClassLoader(any(JobID.class))).thenReturn(StreamTaskTest.class.getClassLoader());
 
@@ -844,6 +846,7 @@ public class StreamTaskTest extends TestLogger {
                        mock(TaskManagerActions.class),
                        mock(InputSplitProvider.class),
                        mock(CheckpointResponder.class),
+                       blobCache,
                        libCache,
                        mock(FileCache.class),
                        new TestingTaskManagerRuntimeInfo(taskManagerConfig, 
new String[] {System.getProperty("java.io.tmpdir")}),

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index b539961..bd72d6d 100644
--- 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{Executor, 
ScheduledExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -58,6 +59,7 @@ class TestingYarnJobManager(
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -73,6 +75,7 @@ class TestingYarnJobManager(
     ioExecutor,
     instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     archive,
     restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index a2d1668..b8dacee 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.{Executor, 
ScheduledExecutorService, TimeUnit}
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
 import org.apache.flink.core.fs.Path
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.ContaineredJobManager
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
@@ -49,7 +50,8 @@ import scala.language.postfixOps
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
-  * @param libraryCacheManager Manager to manage uploaded jar files
+  * @param blobServer BLOB store for file uploads
+  * @param libraryCacheManager manages uploaded jar files and class paths
   * @param archive Archive for finished Flink jobs
   * @param restartStrategyFactory Restart strategy to be used in case of a job 
recovery
   * @param timeout Timeout for futures
@@ -61,6 +63,7 @@ class YarnJobManager(
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -76,6 +79,7 @@ class YarnJobManager(
     ioExecutor,
     instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     archive,
     restartStrategyFactory,

Reply via email to