http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 23f0a38..933c7a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -27,7 +28,6 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.blob.BlobServer; -import org.apache.flink.runtime.blob.BlobService; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; @@ -90,6 +90,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -158,6 +159,7 @@ public class JobManagerHARecoveryTest extends TestLogger { flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString()); flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots); + flinkConfiguration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L); try { Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); @@ -179,6 +181,9 @@ public class JobManagerHARecoveryTest extends TestLogger { archive = system.actorOf(JobManager.getArchiveProps(MemoryArchivist.class, 10, Option.<Path>empty())); + BlobServer blobServer = new BlobServer( + flinkConfiguration, + testingHighAvailabilityServices.createBlobStore()); Props jobManagerProps = Props.create( TestingJobManager.class, flinkConfiguration, @@ -186,11 +191,8 @@ public class JobManagerHARecoveryTest extends TestLogger { TestingUtils.defaultExecutor(), instanceManager, scheduler, - new BlobLibraryCacheManager( - new BlobServer( - flinkConfiguration, - testingHighAvailabilityServices.createBlobStore()), - 3600000L), + blobServer, + new BlobLibraryCacheManager(blobServer), archive, new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), timeout, @@ -353,6 +355,7 @@ public class JobManagerHARecoveryTest extends TestLogger { final Collection<JobID> recoveredJobs = new ArrayList<>(2); + BlobServer blobServer = mock(BlobServer.class); Props jobManagerProps = Props.create( TestingFailingHAJobManager.class, flinkConfiguration, @@ -360,7 +363,8 @@ public class JobManagerHARecoveryTest extends TestLogger { TestingUtils.defaultExecutor(), mock(InstanceManager.class), mock(Scheduler.class), - new BlobLibraryCacheManager(mock(BlobService.class), 1 << 20), + blobServer, + new BlobLibraryCacheManager(blobServer), ActorRef.noSender(), new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), timeout, @@ -397,6 +401,7 @@ public class JobManagerHARecoveryTest extends TestLogger { Executor ioExecutor, InstanceManager instanceManager, Scheduler scheduler, + BlobServer blobServer, BlobLibraryCacheManager libraryCacheManager, ActorRef archive, RestartStrategyFactory restartStrategyFactory, @@ -413,6 +418,7 @@ public class JobManagerHARecoveryTest extends TestLogger { ioExecutor, instanceManager, scheduler, + blobServer, libraryCacheManager, archive, restartStrategyFactory,
http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index 3c75971..6a39293 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -137,14 +137,13 @@ public class JobSubmitTest { // upload two dummy bytes and add their keys to the job graph as dependencies BlobKey key1, key2; BlobClient bc = new BlobClient(new InetSocketAddress("localhost", blobPort), jmConfig); - // TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager - JobID jobId = null; + JobID jobId = jg.getJobID(); try { key1 = bc.put(jobId, new byte[10]); key2 = bc.put(jobId, new byte[10]); // delete one of the blobs to make sure that the startup failed - bc.delete(key2); + bc.delete(jobId, key2); } finally { bc.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index df35369..2c17b5a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ScheduledExecutor; @@ -92,8 +93,8 @@ public class JobMasterTest extends TestLogger { final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor); - final BlobLibraryCacheManager libraryCacheManager = mock(BlobLibraryCacheManager.class); - when(libraryCacheManager.getBlobServerPort()).thenReturn(1337); + BlobServer blobServer = mock(BlobServer.class); + when(blobServer.getPort()).thenReturn(1337); final JobGraph jobGraph = new JobGraph(); @@ -106,7 +107,8 @@ public class JobMasterTest extends TestLogger { haServices, heartbeatServices, Executors.newScheduledThreadPool(1), - libraryCacheManager, + blobServer, + mock(BlobLibraryCacheManager.class), mock(RestartStrategyFactory.class), Time.of(10, TimeUnit.SECONDS), null, @@ -204,6 +206,7 @@ public class JobMasterTest extends TestLogger { haServices, heartbeatServices, Executors.newScheduledThreadPool(1), + mock(BlobServer.class), mock(BlobLibraryCacheManager.class), mock(RestartStrategyFactory.class), Time.of(10, TimeUnit.SECONDS), http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index 70800e5..230ca91 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -25,9 +25,10 @@ import akka.actor.Props; import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import akka.util.Timeout; - import org.apache.curator.framework.CuratorFramework; import org.apache.curator.test.TestingServer; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobServer; @@ -47,13 +48,11 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.TestLogger; - import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; - import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; @@ -178,6 +177,9 @@ public class JobManagerLeaderElectionTest extends TestLogger { SubmittedJobGraphStore submittedJobGraphStore = new StandaloneSubmittedJobGraphStore(); CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory(); + configuration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L); + + BlobServer blobServer = new BlobServer(configuration, new VoidBlobStore()); return Props.create( TestingJobManager.class, configuration, @@ -185,7 +187,8 @@ public class JobManagerLeaderElectionTest extends TestLogger { TestingUtils.defaultExecutor(), new InstanceManager(), new Scheduler(TestingUtils.defaultExecutionContext()), - new BlobLibraryCacheManager(new BlobServer(configuration, new VoidBlobStore()), 10L), + blobServer, + new BlobLibraryCacheManager(blobServer), ActorRef.noSender(), new NoRestartStrategy.NoRestartStrategyFactory(), AkkaUtils.getDefaultTimeoutAsFiniteDuration(), http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 43ff60b..6842bee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -668,7 +669,7 @@ public class TaskExecutorTest extends TestLogger { Collections.<InputGateDeploymentDescriptor>emptyList()); final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class); - when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader()); + when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader()); final JobManagerConnection jobManagerConnection = new JobManagerConnection( jobId, @@ -677,6 +678,7 @@ public class TaskExecutorTest extends TestLogger { jobManagerLeaderId, mock(TaskManagerActions.class), mock(CheckpointResponder.class), + mock(BlobCache.class), libraryCacheManager, mock(ResultPartitionConsumableNotifier.class), mock(PartitionProducerStateChecker.class)); @@ -1191,6 +1193,7 @@ public class TaskExecutorTest extends TestLogger { jobManagerLeaderId, mock(TaskManagerActions.class), mock(CheckpointResponder.class), + mock(BlobCache.class), libraryCacheManager, mock(ResultPartitionConsumableNotifier.class), mock(PartitionProducerStateChecker.class)); http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 085a386..392dc29 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; @@ -145,6 +146,7 @@ public class TaskAsyncCallTest { } private static Task createTask() throws Exception { + BlobCache blobCache = mock(BlobCache.class); LibraryCacheManager libCache = mock(LibraryCacheManager.class); when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader()); @@ -195,6 +197,7 @@ public class TaskAsyncCallTest { mock(TaskManagerActions.class), mock(InputSplitProvider.class), mock(CheckpointResponder.class), + blobCache, libCache, mock(FileCache.class), new TestingTaskManagerRuntimeInfo(), http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java index 1ebd4ad..ac0df36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -98,6 +99,7 @@ public class TaskStopTest { mock(TaskManagerActions.class), mock(InputSplitProvider.class), mock(CheckpointResponder.class), + mock(BlobCache.class), mock(LibraryCacheManager.class), mock(FileCache.class), tmRuntimeInfo, http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index ba3e820..d4cd0cf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -227,7 +228,8 @@ public class TaskTest extends TestLogger { @Test public void testLibraryCacheRegistrationFailed() { try { - Task task = createTask(TestInvokableCorrect.class, mock(LibraryCacheManager.class)); + Task task = createTask(TestInvokableCorrect.class, mock(BlobCache.class), + mock(LibraryCacheManager.class)); // task should be new and perfect assertEquals(ExecutionState.CREATED, task.getExecutionState()); @@ -260,6 +262,7 @@ public class TaskTest extends TestLogger { @Test public void testExecutionFailsInNetworkRegistration() { try { + BlobCache blobCache = mock(BlobCache.class); // mock a working library cache LibraryCacheManager libCache = mock(LibraryCacheManager.class); when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader()); @@ -274,7 +277,7 @@ public class TaskTest extends TestLogger { when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC); doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class)); - Task task = createTask(TestInvokableCorrect.class, libCache, network, consumableNotifier, partitionProducerStateChecker, executor); + Task task = createTask(TestInvokableCorrect.class, blobCache, libCache, network, consumableNotifier, partitionProducerStateChecker, executor); task.registerExecutionListener(listener); @@ -617,6 +620,7 @@ public class TaskTest extends TestLogger { IntermediateDataSetID resultId = new IntermediateDataSetID(); ResultPartitionID partitionId = new ResultPartitionID(); + BlobCache blobCache = mock(BlobCache.class); LibraryCacheManager libCache = mock(LibraryCacheManager.class); when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader()); @@ -629,7 +633,7 @@ public class TaskTest extends TestLogger { when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) .thenReturn(mock(TaskKvStateRegistry.class)); - createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor()); + createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor()); // Test all branches of trigger partition state check @@ -638,7 +642,7 @@ public class TaskTest extends TestLogger { createQueuesAndActors(); // PartitionProducerDisposedException - Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor()); + Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor()); CompletableFuture<ExecutionState> promise = new CompletableFuture<>(); when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise); @@ -654,7 +658,7 @@ public class TaskTest extends TestLogger { createQueuesAndActors(); // Any other exception - Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor()); + Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor()); CompletableFuture<ExecutionState> promise = new CompletableFuture<>(); when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise); @@ -671,7 +675,7 @@ public class TaskTest extends TestLogger { createQueuesAndActors(); // TimeoutException handled special => retry - Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor()); + Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor()); SingleInputGate inputGate = mock(SingleInputGate.class); when(inputGate.getConsumedResultId()).thenReturn(resultId); @@ -702,7 +706,7 @@ public class TaskTest extends TestLogger { createQueuesAndActors(); // Success - Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor()); + Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor()); SingleInputGate inputGate = mock(SingleInputGate.class); when(inputGate.getConsumedResultId()).thenReturn(resultId); @@ -882,26 +886,30 @@ public class TaskTest extends TestLogger { } private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config) throws IOException { + BlobCache blobCache = mock(BlobCache.class); LibraryCacheManager libCache = mock(LibraryCacheManager.class); when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader()); - return createTask(invokable, libCache, config, new ExecutionConfig()); + return createTask(invokable, blobCache,libCache, config, new ExecutionConfig()); } private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config, ExecutionConfig execConfig) throws IOException { + BlobCache blobCache = mock(BlobCache.class); LibraryCacheManager libCache = mock(LibraryCacheManager.class); when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader()); - return createTask(invokable, libCache, config, execConfig); + return createTask(invokable, blobCache,libCache, config, execConfig); } private Task createTask( Class<? extends AbstractInvokable> invokable, + BlobCache blobCache, LibraryCacheManager libCache) throws IOException { - return createTask(invokable, libCache, new Configuration(), new ExecutionConfig()); + return createTask(invokable, blobCache,libCache, new Configuration(), new ExecutionConfig()); } private Task createTask( Class<? extends AbstractInvokable> invokable, + BlobCache blobCache, LibraryCacheManager libCache, Configuration config, ExecutionConfig execConfig) throws IOException { @@ -916,21 +924,23 @@ public class TaskTest extends TestLogger { when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) .thenReturn(mock(TaskKvStateRegistry.class)); - return createTask(invokable, libCache, network, consumableNotifier, partitionProducerStateChecker, executor, config, execConfig); + return createTask(invokable, blobCache, libCache, network, consumableNotifier, partitionProducerStateChecker, executor, config, execConfig); } private Task createTask( Class<? extends AbstractInvokable> invokable, + BlobCache blobCache, LibraryCacheManager libCache, NetworkEnvironment networkEnvironment, ResultPartitionConsumableNotifier consumableNotifier, PartitionProducerStateChecker partitionProducerStateChecker, Executor executor) throws IOException { - return createTask(invokable, libCache, networkEnvironment, consumableNotifier, partitionProducerStateChecker, executor, new Configuration(), new ExecutionConfig()); + return createTask(invokable, blobCache, libCache, networkEnvironment, consumableNotifier, partitionProducerStateChecker, executor, new Configuration(), new ExecutionConfig()); } private Task createTask( Class<? extends AbstractInvokable> invokable, + BlobCache blobCache, LibraryCacheManager libCache, NetworkEnvironment networkEnvironment, ResultPartitionConsumableNotifier consumableNotifier, @@ -991,6 +1001,7 @@ public class TaskTest extends TestLogger { taskManagerConnection, inputSplitProvider, checkpointResponder, + blobCache, libCache, mock(FileCache.class), new TestingTaskManagerRuntimeInfo(taskManagerConfig), http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java new file mode 100644 index 0000000..37c141d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testtasks; + +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; + +/** + * Task which blocks until the (static) {@link #unblock()} method is called and then fails with an + * exception. + */ +public class FailingBlockingInvokable extends AbstractInvokable { + private static volatile boolean blocking = true; + private static final Object lock = new Object(); + + @Override + public void invoke() throws Exception { + while (blocking) { + synchronized (lock) { + lock.wait(); + } + } + throw new RuntimeException("This exception is expected."); + } + + public static void unblock() { + blocking = false; + + synchronized (lock) { + lock.notifyAll(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java index c1df5a3..229f1eb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; @@ -178,6 +179,7 @@ public class JvmExitOnFatalErrorTest { new NoOpTaskManagerActions(), new NoOpInputSplitProvider(), new NoOpCheckpointResponder(), + mock(BlobCache.class), new FallbackLibraryCacheManager(), new FileCache(tmInfo.getTmpDirectories()), tmInfo, http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala index 1b9ee48..95da981 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala @@ -264,14 +264,15 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor components._1, components._2, components._3, - ActorRef.noSender, components._4, + ActorRef.noSender, components._5, + components._6, highAvailabilityServices.getJobManagerLeaderElectionService( HighAvailabilityServices.DEFAULT_JOB_ID), highAvailabilityServices.getSubmittedJobGraphStore(), highAvailabilityServices.getCheckpointRecoveryFactory(), - components._8, + components._9, None) _system.actorOf(props) http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index e5655bb..87f8088 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -28,6 +28,7 @@ import akka.testkit.CallingThreadDispatcher import org.apache.flink.api.common.JobID import org.apache.flink.configuration.{Configuration, JobManagerOptions} import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint.savepoint.Savepoint import org.apache.flink.runtime.checkpoint.{CheckpointOptions, CheckpointRecoveryFactory} import org.apache.flink.runtime.clusterframework.FlinkResourceManager @@ -110,6 +111,7 @@ class TestingCluster( ioExecutor: Executor, instanceManager: InstanceManager, scheduler: Scheduler, + blobServer: BlobServer, libraryCacheManager: BlobLibraryCacheManager, archive: ActorRef, restartStrategyFactory: RestartStrategyFactory, @@ -127,6 +129,7 @@ class TestingCluster( ioExecutor, instanceManager, scheduler, + blobServer, libraryCacheManager, archive, restartStrategyFactory, http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index f50a832..8b9ce15 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -22,6 +22,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService} import akka.actor.ActorRef import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory @@ -34,15 +35,16 @@ import org.apache.flink.runtime.metrics.MetricRegistry import scala.concurrent.duration._ import scala.language.postfixOps -/** JobManager implementation extended by testing messages - * - */ +/** + * JobManager implementation extended by testing messages + */ class TestingJobManager( flinkConfiguration: Configuration, futureExecutor: ScheduledExecutorService, ioExecutor: Executor, instanceManager: InstanceManager, scheduler: Scheduler, + blobServer: BlobServer, libraryCacheManager: BlobLibraryCacheManager, archive: ActorRef, restartStrategyFactory: RestartStrategyFactory, @@ -58,6 +60,7 @@ class TestingJobManager( ioExecutor, instanceManager, scheduler, + blobServer, libraryCacheManager, archive, restartStrategyFactory, http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java index 3b8178b..82642ea 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; @@ -156,6 +157,7 @@ public class BlockingCheckpointsTest { mock(TaskManagerActions.class), mock(InputSplitProvider.class), mock(CheckpointResponder.class), + mock(BlobCache.class), new FallbackLibraryCacheManager(), new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }), new TestingTaskManagerRuntimeInfo(), http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index 82e4f31..14ae733 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; @@ -274,6 +275,7 @@ public class InterruptSensitiveRestoreTest { mock(TaskManagerActions.class), mock(InputSplitProvider.class), mock(CheckpointResponder.class), + mock(BlobCache.class), new FallbackLibraryCacheManager(), new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }), new TestingTaskManagerRuntimeInfo(), http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index 702d833..79e9583 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -153,6 +154,7 @@ public class StreamTaskTerminationTest extends TestLogger { mock(TaskManagerActions.class), mock(InputSplitProvider.class), mock(CheckpointResponder.class), + mock(BlobCache.class), new FallbackLibraryCacheManager(), mock(FileCache.class), taskManagerRuntimeInfo, http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 09e9a1b..08c3207 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; @@ -796,6 +797,7 @@ public class StreamTaskTest extends TestLogger { StreamConfig taskConfig, Configuration taskManagerConfig) throws Exception { + BlobCache blobCache = mock(BlobCache.class); LibraryCacheManager libCache = mock(LibraryCacheManager.class); when(libCache.getClassLoader(any(JobID.class))).thenReturn(StreamTaskTest.class.getClassLoader()); @@ -844,6 +846,7 @@ public class StreamTaskTest extends TestLogger { mock(TaskManagerActions.class), mock(InputSplitProvider.class), mock(CheckpointResponder.class), + blobCache, libCache, mock(FileCache.class), new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[] {System.getProperty("java.io.tmpdir")}), http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala index b539961..bd72d6d 100644 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -22,6 +22,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService} import akka.actor.ActorRef import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory @@ -58,6 +59,7 @@ class TestingYarnJobManager( ioExecutor: Executor, instanceManager: InstanceManager, scheduler: Scheduler, + blobServer: BlobServer, libraryCacheManager: BlobLibraryCacheManager, archive: ActorRef, restartStrategyFactory: RestartStrategyFactory, @@ -73,6 +75,7 @@ class TestingYarnJobManager( ioExecutor, instanceManager, scheduler, + blobServer, libraryCacheManager, archive, restartStrategyFactory, http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index a2d1668..b8dacee 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -24,6 +24,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit} import akka.actor.ActorRef import org.apache.flink.configuration.{Configuration => FlinkConfiguration} import org.apache.flink.core.fs.Path +import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.clusterframework.ContaineredJobManager import org.apache.flink.runtime.clusterframework.messages.StopCluster @@ -49,7 +50,8 @@ import scala.language.postfixOps * @param instanceManager Instance manager to manage the registered * [[org.apache.flink.runtime.taskmanager.TaskManager]] * @param scheduler Scheduler to schedule Flink jobs - * @param libraryCacheManager Manager to manage uploaded jar files + * @param blobServer BLOB store for file uploads + * @param libraryCacheManager manages uploaded jar files and class paths * @param archive Archive for finished Flink jobs * @param restartStrategyFactory Restart strategy to be used in case of a job recovery * @param timeout Timeout for futures @@ -61,6 +63,7 @@ class YarnJobManager( ioExecutor: Executor, instanceManager: InstanceManager, scheduler: FlinkScheduler, + blobServer: BlobServer, libraryCacheManager: BlobLibraryCacheManager, archive: ActorRef, restartStrategyFactory: RestartStrategyFactory, @@ -76,6 +79,7 @@ class YarnJobManager( ioExecutor, instanceManager, scheduler, + blobServer, libraryCacheManager, archive, restartStrategyFactory,