[
https://issues.apache.org/jira/browse/FLINK-10426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646546#comment-16646546
]
ASF GitHub Bot commented on FLINK-10426:
----------------------------------------
asfgit closed pull request #6778: [FLINK-10426] [tests] Port TaskTest to new
code base
URL: https://github.com/apache/flink/pull/6778
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index bb5cd17193a..5fc5d17494e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -20,24 +20,29 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TransientBlobCache;
+import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -49,48 +54,46 @@
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.WrappingRuntimeException;
-
-import org.junit.After;
import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import javax.annotation.Nonnull;
-
import java.io.IOException;
import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeoutException;
-import scala.concurrent.duration.FiniteDuration;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -100,445 +103,383 @@
/**
* Tests for the Task, which make sure that correct state transitions happen,
* and failures are correctly handled.
- *
- * <p>All tests here have a set of mock actors for TaskManager, JobManager, and
- * execution listener, which simply put the messages in a queue to be picked
- * up by the test and validated.
*/
public class TaskTest extends TestLogger {
private static OneShotLatch awaitLatch;
private static OneShotLatch triggerLatch;
- private static OneShotLatch cancelLatch;
-
- private ActorGateway taskManagerGateway;
- private ActorGateway jobManagerGateway;
-
- private ActorGatewayTaskManagerActions taskManagerConnection;
- private BlockingQueue<Object> taskManagerMessages;
- private BlockingQueue<Object> jobManagerMessages;
- private BlockingQueue<TaskExecutionState> listenerMessages;
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
@Before
- public void createQueuesAndActors() {
- taskManagerMessages = new LinkedBlockingQueue<>();
- jobManagerMessages = new LinkedBlockingQueue<>();
- listenerMessages = new LinkedBlockingQueue<>();
- taskManagerGateway = new
ForwardingActorGateway(taskManagerMessages);
- jobManagerGateway = new
ForwardingActorGateway(jobManagerMessages);
-
- taskManagerConnection = new
ActorGatewayTaskManagerActions(taskManagerGateway) {
- @Override
- public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {
-
super.updateTaskExecutionState(taskExecutionState);
- listenerMessages.add(taskExecutionState);
- }
- };
-
+ public void setup() {
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
- cancelLatch = new OneShotLatch();
}
- @After
- public void clearActorsAndMessages() {
- jobManagerMessages = null;
- taskManagerMessages = null;
- listenerMessages = null;
-
- taskManagerGateway = null;
- jobManagerGateway = null;
- }
+ @Test
+ public void testRegularExecution() throws Exception {
+ final QueuedNoOpTaskManagerActions taskManagerActions = new
QueuedNoOpTaskManagerActions();
+ final Task task = new TaskBuilder()
+ .setTaskManagerActions(taskManagerActions)
+ .build();
+
+ // task should be new and perfect
+ assertEquals(ExecutionState.CREATED, task.getExecutionState());
+ assertFalse(task.isCanceledOrFailed());
+ assertNull(task.getFailureCause());
+
+ // go into the run method. we should switch to DEPLOYING,
RUNNING, then
+ // FINISHED, and all should be good
+ task.run();
- //
------------------------------------------------------------------------
- // Tests
- //
------------------------------------------------------------------------
+ // verify final state
+ assertEquals(ExecutionState.FINISHED, task.getExecutionState());
+ assertFalse(task.isCanceledOrFailed());
+ assertNull(task.getFailureCause());
+ assertNull(task.getInvokable());
- @Test
- public void testRegularExecution() {
- try {
- Task task = createTask(TestInvokableCorrect.class);
-
- // task should be new and perfect
- assertEquals(ExecutionState.CREATED,
task.getExecutionState());
- assertFalse(task.isCanceledOrFailed());
- assertNull(task.getFailureCause());
-
- // go into the run method. we should switch to
DEPLOYING, RUNNING, then
- // FINISHED, and all should be good
- task.run();
-
- // verify final state
- assertEquals(ExecutionState.FINISHED,
task.getExecutionState());
- assertFalse(task.isCanceledOrFailed());
- assertNull(task.getFailureCause());
- assertNull(task.getInvokable());
-
- // verify listener messages
- validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateListenerMessage(ExecutionState.FINISHED, task,
false);
-
- // make sure that the TaskManager received an message
to unregister the task
- validateTaskManagerStateChange(ExecutionState.RUNNING,
task, false);
- validateUnregisterTask(task.getExecutionId());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+
taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
+
taskManagerActions.validateListenerMessage(ExecutionState.FINISHED, task, null);
}
@Test
- public void testCancelRightAway() {
- try {
- Task task = createTask(TestInvokableCorrect.class);
- task.cancelExecution();
+ public void testCancelRightAway() throws Exception {
+ final Task task = new TaskBuilder().build();
+ task.cancelExecution();
- assertEquals(ExecutionState.CANCELING,
task.getExecutionState());
+ assertEquals(ExecutionState.CANCELING,
task.getExecutionState());
- task.run();
+ task.run();
- // verify final state
- assertEquals(ExecutionState.CANCELED,
task.getExecutionState());
- validateUnregisterTask(task.getExecutionId());
+ // verify final state
+ assertEquals(ExecutionState.CANCELED, task.getExecutionState());
- assertNull(task.getInvokable());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ assertNull(task.getInvokable());
}
@Test
- public void testFailExternallyRightAway() {
- try {
- Task task = createTask(TestInvokableCorrect.class);
- task.failExternally(new Exception("fail externally"));
+ public void testFailExternallyRightAway() throws Exception {
+ final Task task = new TaskBuilder().build();
+ task.failExternally(new Exception("fail externally"));
- assertEquals(ExecutionState.FAILED,
task.getExecutionState());
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
- task.run();
+ task.run();
- // verify final state
- assertEquals(ExecutionState.FAILED,
task.getExecutionState());
- validateUnregisterTask(task.getExecutionId());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ // verify final state
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
}
@Test
- public void testLibraryCacheRegistrationFailed() {
- try {
- BlobCacheService blobService = createBlobCache();
- Task task = createTask(TestInvokableCorrect.class,
blobService,
- mock(LibraryCacheManager.class));
+ public void testLibraryCacheRegistrationFailed() throws Exception {
+ final QueuedNoOpTaskManagerActions taskManagerActions = new
QueuedNoOpTaskManagerActions();
+ final Task task = new TaskBuilder()
+ .setTaskManagerActions(taskManagerActions)
+
.setLibraryCacheManager(mock(LibraryCacheManager.class)) // inactive manager
+ .build();
+
+ // task should be new and perfect
+ assertEquals(ExecutionState.CREATED, task.getExecutionState());
+ assertFalse(task.isCanceledOrFailed());
+ assertNull(task.getFailureCause());
+
+ // should fail
+ task.run();
- // task should be new and perfect
- assertEquals(ExecutionState.CREATED,
task.getExecutionState());
- assertFalse(task.isCanceledOrFailed());
- assertNull(task.getFailureCause());
+ // verify final state
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+ assertTrue(task.isCanceledOrFailed());
+ assertNotNull(task.getFailureCause());
+ assertNotNull(task.getFailureCause().getMessage());
+
assertTrue(task.getFailureCause().getMessage().contains("classloader"));
- // should fail
- task.run();
+ assertNull(task.getInvokable());
- // verify final state
- assertEquals(ExecutionState.FAILED,
task.getExecutionState());
- assertTrue(task.isCanceledOrFailed());
- assertNotNull(task.getFailureCause());
- assertNotNull(task.getFailureCause().getMessage());
-
assertTrue(task.getFailureCause().getMessage().contains("classloader"));
+ taskManagerActions.validateListenerMessage(
+ ExecutionState.FAILED, task, new Exception("No user
code classloader available."));
+ }
- // verify listener messages
- validateListenerMessage(ExecutionState.FAILED, task,
true);
+ @Test
+ public void testExecutionFailsInBlobsMissing() throws Exception {
+ final PermanentBlobKey missingKey = new PermanentBlobKey();
+
+ final Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+ TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+ config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+ final BlobServer blobServer = new BlobServer(config, new
VoidBlobStore());
+ blobServer.start();
+ InetSocketAddress serverAddress = new
InetSocketAddress("localhost", blobServer.getPort());
+ final PermanentBlobCache permanentBlobCache = new
PermanentBlobCache(config, new VoidBlobStore(), serverAddress);
+
+ final BlobLibraryCacheManager libraryCacheManager =
+ new BlobLibraryCacheManager(
+ permanentBlobCache,
+
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+ new String[0]);
+
+ final Task task = new TaskBuilder()
+
.setRequiredJarFileBlobKeys(Collections.singletonList(missingKey))
+ .setLibraryCacheManager(libraryCacheManager)
+ .build();
+
+ // task should be new and perfect
+ assertEquals(ExecutionState.CREATED, task.getExecutionState());
+ assertFalse(task.isCanceledOrFailed());
+ assertNull(task.getFailureCause());
+
+ // should fail
+ task.run();
- // make sure that the TaskManager received an message
to unregister the task
- validateUnregisterTask(task.getExecutionId());
+ // verify final state
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+ assertTrue(task.isCanceledOrFailed());
+ assertNotNull(task.getFailureCause());
+ assertNotNull(task.getFailureCause().getMessage());
+ assertTrue(task.getFailureCause().getMessage().contains("Failed
to fetch BLOB"));
- assertNull(task.getInvokable());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ assertNull(task.getInvokable());
}
@Test
- public void testExecutionFailsInNetworkRegistration() {
- try {
- BlobCacheService blobService = createBlobCache();
- // mock a working library cache
- LibraryCacheManager libCache =
mock(LibraryCacheManager.class);
-
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-
- // mock a network manager that rejects registration
- ResultPartitionManager partitionManager =
mock(ResultPartitionManager.class);
- ResultPartitionConsumableNotifier consumableNotifier =
mock(ResultPartitionConsumableNotifier.class);
- PartitionProducerStateChecker
partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
- TaskEventDispatcher taskEventDispatcher =
mock(TaskEventDispatcher.class);
- Executor executor = mock(Executor.class);
- NetworkEnvironment network =
mock(NetworkEnvironment.class);
-
when(network.getResultPartitionManager()).thenReturn(partitionManager);
-
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
-
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
- doThrow(new
RuntimeException("buffers")).when(network).registerTask(any(Task.class));
-
- Task task = createTask(TestInvokableCorrect.class,
blobService, libCache, network, consumableNotifier,
partitionProducerStateChecker, executor);
-
- task.run();
+ public void testExecutionFailsInNetworkRegistration() throws Exception {
+ // mock a network manager that rejects registration
+ final ResultPartitionManager partitionManager =
mock(ResultPartitionManager.class);
+ final ResultPartitionConsumableNotifier consumableNotifier =
mock(ResultPartitionConsumableNotifier.class);
+ final PartitionProducerStateChecker
partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
+ final TaskEventDispatcher taskEventDispatcher =
mock(TaskEventDispatcher.class);
+
+ final NetworkEnvironment network =
mock(NetworkEnvironment.class);
+
when(network.getResultPartitionManager()).thenReturn(partitionManager);
+
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
+
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
+ doThrow(new
RuntimeException("buffers")).when(network).registerTask(any(Task.class));
- assertEquals(ExecutionState.FAILED,
task.getExecutionState());
- assertTrue(task.isCanceledOrFailed());
-
assertTrue(task.getFailureCause().getMessage().contains("buffers"));
+ final QueuedNoOpTaskManagerActions taskManagerActions = new
QueuedNoOpTaskManagerActions();
+ final Task task = new TaskBuilder()
+ .setTaskManagerActions(taskManagerActions)
+ .setConsumableNotifier(consumableNotifier)
+
.setPartitionProducerStateChecker(partitionProducerStateChecker)
+ .setNetworkEnvironment(network)
+ .build();
- validateUnregisterTask(task.getExecutionId());
- validateListenerMessage(ExecutionState.FAILED, task,
true);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ // should fail
+ task.run();
+
+ // verify final state
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+ assertTrue(task.isCanceledOrFailed());
+
assertTrue(task.getFailureCause().getMessage().contains("buffers"));
+
+ taskManagerActions.validateListenerMessage(
+ ExecutionState.FAILED, task, new
RuntimeException("buffers"));
}
@Test
- public void testInvokableInstantiationFailed() {
- try {
- Task task = createTask(InvokableNonInstantiable.class);
-
- task.run();
+ public void testInvokableInstantiationFailed() throws Exception {
+ final QueuedNoOpTaskManagerActions taskManagerActions = new
QueuedNoOpTaskManagerActions();
+ final Task task = new TaskBuilder()
+ .setTaskManagerActions(taskManagerActions)
+ .setInvokable(InvokableNonInstantiable.class)
+ .build();
+
+ // should fail
+ task.run();
- assertEquals(ExecutionState.FAILED,
task.getExecutionState());
- assertTrue(task.isCanceledOrFailed());
-
assertTrue(task.getFailureCause().getMessage().contains("instantiate"));
+ // verify final state
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+ assertTrue(task.isCanceledOrFailed());
+
assertTrue(task.getFailureCause().getMessage().contains("instantiate"));
- validateUnregisterTask(task.getExecutionId());
- validateListenerMessage(ExecutionState.FAILED, task,
true);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ taskManagerActions.validateListenerMessage(
+ ExecutionState.FAILED, task, new FlinkException("Could
not instantiate the task's invokable class."));
}
@Test
- public void testExecutionFailsInInvoke() {
- try {
- Task task =
createTask(InvokableWithExceptionInInvoke.class);
-
- task.run();
+ public void testExecutionFailsInInvoke() throws Exception {
+ final QueuedNoOpTaskManagerActions taskManagerActions = new
QueuedNoOpTaskManagerActions();
+ final Task task = new TaskBuilder()
+ .setInvokable(InvokableWithExceptionInInvoke.class)
+ .setTaskManagerActions(taskManagerActions)
+ .build();
- assertEquals(ExecutionState.FAILED,
task.getExecutionState());
- assertTrue(task.isCanceledOrFailed());
- assertNotNull(task.getFailureCause());
- assertNotNull(task.getFailureCause().getMessage());
-
assertTrue(task.getFailureCause().getMessage().contains("test"));
+ task.run();
- validateTaskManagerStateChange(ExecutionState.RUNNING,
task, false);
- validateUnregisterTask(task.getExecutionId());
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+ assertTrue(task.isCanceledOrFailed());
+ assertNotNull(task.getFailureCause());
+ assertNotNull(task.getFailureCause().getMessage());
+
assertTrue(task.getFailureCause().getMessage().contains("test"));
- validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateListenerMessage(ExecutionState.FAILED, task,
true);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+
taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
+
taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new
Exception("test"));
}
@Test
- public void testFailWithWrappedException() {
- try {
- Task task =
createTask(FailingInvokableWithChainedException.class);
-
- task.run();
+ public void testFailWithWrappedException() throws Exception {
+ final QueuedNoOpTaskManagerActions taskManagerActions = new
QueuedNoOpTaskManagerActions();
+ final Task task = new TaskBuilder()
+
.setInvokable(FailingInvokableWithChainedException.class)
+ .setTaskManagerActions(taskManagerActions)
+ .build();
- assertEquals(ExecutionState.FAILED,
task.getExecutionState());
- assertTrue(task.isCanceledOrFailed());
+ task.run();
- Throwable cause = task.getFailureCause();
- assertTrue(cause instanceof IOException);
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+ assertTrue(task.isCanceledOrFailed());
- validateTaskManagerStateChange(ExecutionState.RUNNING,
task, false);
- validateUnregisterTask(task.getExecutionId());
+ final Throwable cause = task.getFailureCause();
+ assertTrue(cause instanceof IOException);
- validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateListenerMessage(ExecutionState.FAILED, task,
true);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+
taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
+
taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new
IOException("test"));
}
@Test
- public void testCancelDuringInvoke() {
- try {
- Task task = createTask(InvokableBlockingInInvoke.class);
-
- // run the task asynchronous
- task.startTaskThread();
-
- // wait till the task is in invoke
- awaitLatch.await();
+ public void testCancelDuringInvoke() throws Exception {
+ final QueuedNoOpTaskManagerActions taskManagerActions = new
QueuedNoOpTaskManagerActions();
+ final Task task = new TaskBuilder()
+ .setInvokable(InvokableBlockingInInvoke.class)
+ .setTaskManagerActions(taskManagerActions)
+ .build();
+
+ // run the task asynchronous
+ task.startTaskThread();
- task.cancelExecution();
- assertTrue(task.getExecutionState() ==
ExecutionState.CANCELING ||
- task.getExecutionState() ==
ExecutionState.CANCELED);
+ // wait till the task is in invoke
+ awaitLatch.await();
- task.getExecutingThread().join();
+ task.cancelExecution();
+ assertTrue(task.getExecutionState() == ExecutionState.CANCELING
||
+ task.getExecutionState() == ExecutionState.CANCELED);
- assertEquals(ExecutionState.CANCELED,
task.getExecutionState());
- assertTrue(task.isCanceledOrFailed());
- assertNull(task.getFailureCause());
+ task.getExecutingThread().join();
- validateTaskManagerStateChange(ExecutionState.RUNNING,
task, false);
- validateUnregisterTask(task.getExecutionId());
+ assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+ assertTrue(task.isCanceledOrFailed());
+ assertNull(task.getFailureCause());
- validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateListenerMessage(ExecutionState.CANCELED, task,
false);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+
taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
+
taskManagerActions.validateListenerMessage(ExecutionState.CANCELED, task, null);
}
@Test
- public void testFailExternallyDuringInvoke() {
- try {
- Task task = createTask(InvokableBlockingInInvoke.class);
-
- // run the task asynchronous
- task.startTaskThread();
-
- // wait till the task is in regInOut
- awaitLatch.await();
+ public void testFailExternallyDuringInvoke() throws Exception {
+ final QueuedNoOpTaskManagerActions taskManagerActions = new
QueuedNoOpTaskManagerActions();
+ final Task task = new TaskBuilder()
+ .setInvokable(InvokableBlockingInInvoke.class)
+ .setTaskManagerActions(taskManagerActions)
+ .build();
+
+ // run the task asynchronous
+ task.startTaskThread();
- task.failExternally(new Exception("test"));
- assertEquals(ExecutionState.FAILED,
task.getExecutionState());
+ // wait till the task is in invoke
+ awaitLatch.await();
- task.getExecutingThread().join();
+ task.failExternally(new Exception("test"));
- assertEquals(ExecutionState.FAILED,
task.getExecutionState());
- assertTrue(task.isCanceledOrFailed());
-
assertTrue(task.getFailureCause().getMessage().contains("test"));
+ task.getExecutingThread().join();
- validateTaskManagerStateChange(ExecutionState.RUNNING,
task, false);
- validateUnregisterTask(task.getExecutionId());
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+ assertTrue(task.isCanceledOrFailed());
+
assertTrue(task.getFailureCause().getMessage().contains("test"));
- validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateListenerMessage(ExecutionState.FAILED, task,
true);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+
taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
+
taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new
Exception("test"));
}
@Test
- public void testCanceledAfterExecutionFailedInInvoke() {
- try {
- Task task =
createTask(InvokableWithExceptionInInvoke.class);
-
- task.run();
+ public void testCanceledAfterExecutionFailedInInvoke() throws Exception
{
+ final QueuedNoOpTaskManagerActions taskManagerActions = new
QueuedNoOpTaskManagerActions();
+ final Task task = new TaskBuilder()
+ .setInvokable(InvokableWithExceptionInInvoke.class)
+ .setTaskManagerActions(taskManagerActions)
+ .build();
- // this should not overwrite the failure state
- task.cancelExecution();
+ task.run();
- assertEquals(ExecutionState.FAILED,
task.getExecutionState());
- assertTrue(task.isCanceledOrFailed());
-
assertTrue(task.getFailureCause().getMessage().contains("test"));
+ // this should not overwrite the failure state
+ task.cancelExecution();
- validateTaskManagerStateChange(ExecutionState.RUNNING,
task, false);
- validateUnregisterTask(task.getExecutionId());
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+ assertTrue(task.isCanceledOrFailed());
+
assertTrue(task.getFailureCause().getMessage().contains("test"));
- validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateListenerMessage(ExecutionState.FAILED, task,
true);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+
taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
+
taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new
Exception("test"));
}
@Test
- public void testExecutionFailsAfterCanceling() {
- try {
- Task task =
createTask(InvokableWithExceptionOnTrigger.class);
-
- // run the task asynchronous
- task.startTaskThread();
-
- // wait till the task is in invoke
- awaitLatch.await();
+ public void testExecutionFailsAfterCanceling() throws Exception {
+ final QueuedNoOpTaskManagerActions taskManagerActions = new
QueuedNoOpTaskManagerActions();
+ final Task task = new TaskBuilder()
+ .setInvokable(InvokableWithExceptionOnTrigger.class)
+ .setTaskManagerActions(taskManagerActions)
+ .build();
+
+ // run the task asynchronous
+ task.startTaskThread();
- task.cancelExecution();
- assertEquals(ExecutionState.CANCELING,
task.getExecutionState());
+ // wait till the task is in invoke
+ awaitLatch.await();
- // this causes an exception
- triggerLatch.trigger();
+ task.cancelExecution();
+ assertEquals(ExecutionState.CANCELING,
task.getExecutionState());
- task.getExecutingThread().join();
+ // this causes an exception
+ triggerLatch.trigger();
- // we should still be in state canceled
- assertEquals(ExecutionState.CANCELED,
task.getExecutionState());
- assertTrue(task.isCanceledOrFailed());
- assertNull(task.getFailureCause());
+ task.getExecutingThread().join();
- validateTaskManagerStateChange(ExecutionState.RUNNING,
task, false);
- validateUnregisterTask(task.getExecutionId());
+ // we should still be in state canceled
+ assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+ assertTrue(task.isCanceledOrFailed());
+ assertNull(task.getFailureCause());
- validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateListenerMessage(ExecutionState.CANCELED, task,
false);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+
taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
+
taskManagerActions.validateListenerMessage(ExecutionState.CANCELED, task, null);
}
@Test
- public void testExecutionFailsAfterTaskMarkedFailed() {
- try {
- Task task =
createTask(InvokableWithExceptionOnTrigger.class);
-
- // run the task asynchronous
- task.startTaskThread();
-
- // wait till the task is in invoke
- awaitLatch.await();
+ public void testExecutionFailsAfterTaskMarkedFailed() throws Exception {
+ final QueuedNoOpTaskManagerActions taskManagerActions = new
QueuedNoOpTaskManagerActions();
+ final Task task = new TaskBuilder()
+ .setInvokable(InvokableWithExceptionOnTrigger.class)
+ .setTaskManagerActions(taskManagerActions)
+ .build();
+
+ // run the task asynchronous
+ task.startTaskThread();
- task.failExternally(new Exception("external"));
- assertEquals(ExecutionState.FAILED,
task.getExecutionState());
+ // wait till the task is in invoke
+ awaitLatch.await();
- // this causes an exception
- triggerLatch.trigger();
+ task.failExternally(new Exception("external"));
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
- task.getExecutingThread().join();
+ // this causes an exception
+ triggerLatch.trigger();
- assertEquals(ExecutionState.FAILED,
task.getExecutionState());
- assertTrue(task.isCanceledOrFailed());
-
assertTrue(task.getFailureCause().getMessage().contains("external"));
+ task.getExecutingThread().join();
- validateTaskManagerStateChange(ExecutionState.RUNNING,
task, false);
- validateUnregisterTask(task.getExecutionId());
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+ assertTrue(task.isCanceledOrFailed());
+
assertTrue(task.getFailureCause().getMessage().contains("external"));
- validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateListenerMessage(ExecutionState.FAILED, task,
true);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+
taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
+
taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new
Exception("external"));
}
+
@Test
public void testCancelTaskException() throws Exception {
- final Task task =
createTask(InvokableWithCancelTaskExceptionInInvoke.class);
+ final Task task = new TaskBuilder()
+
.setInvokable(InvokableWithCancelTaskExceptionInInvoke.class)
+ .build();
// Cause CancelTaskException.
triggerLatch.trigger();
@@ -550,7 +491,9 @@ public void testCancelTaskException() throws Exception {
@Test
public void testCancelTaskExceptionAfterTaskMarkedFailed() throws
Exception {
- final Task task =
createTask(InvokableWithCancelTaskExceptionInInvoke.class);
+ final Task task = new TaskBuilder()
+
.setInvokable(InvokableWithCancelTaskExceptionInInvoke.class)
+ .build();
task.startTaskThread();
@@ -573,13 +516,15 @@ public void
testCancelTaskExceptionAfterTaskMarkedFailed() throws Exception {
@Test
public void testOnPartitionStateUpdate() throws Exception {
- IntermediateDataSetID resultId = new IntermediateDataSetID();
- ResultPartitionID partitionId = new ResultPartitionID();
+ final IntermediateDataSetID resultId = new
IntermediateDataSetID();
+ final ResultPartitionID partitionId = new ResultPartitionID();
- SingleInputGate inputGate = mock(SingleInputGate.class);
+ final SingleInputGate inputGate = mock(SingleInputGate.class);
when(inputGate.getConsumedResultId()).thenReturn(resultId);
- final Task task = createTask(InvokableBlockingInInvoke.class);
+ final Task task = new TaskBuilder()
+ .setInvokable(InvokableBlockingInInvoke.class)
+ .build();
// Set the mock input gate
setInputGate(task, inputGate);
@@ -619,36 +564,36 @@ public void testOnPartitionStateUpdate() throws Exception
{
*/
@Test
public void testTriggerPartitionStateUpdate() throws Exception {
- IntermediateDataSetID resultId = new IntermediateDataSetID();
- ResultPartitionID partitionId = new ResultPartitionID();
-
- BlobCacheService blobService = createBlobCache();
- LibraryCacheManager libCache = mock(LibraryCacheManager.class);
-
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
+ final IntermediateDataSetID resultId = new
IntermediateDataSetID();
+ final ResultPartitionID partitionId = new ResultPartitionID();
- PartitionProducerStateChecker partitionChecker =
mock(PartitionProducerStateChecker.class);
- TaskEventDispatcher taskEventDispatcher =
mock(TaskEventDispatcher.class);
+ final PartitionProducerStateChecker partitionChecker =
mock(PartitionProducerStateChecker.class);
+ final TaskEventDispatcher taskEventDispatcher =
mock(TaskEventDispatcher.class);
- ResultPartitionConsumableNotifier consumableNotifier =
mock(ResultPartitionConsumableNotifier.class);
- NetworkEnvironment network = mock(NetworkEnvironment.class);
+ final ResultPartitionConsumableNotifier consumableNotifier =
mock(ResultPartitionConsumableNotifier.class);
+ final NetworkEnvironment network =
mock(NetworkEnvironment.class);
when(network.getResultPartitionManager()).thenReturn(mock(ResultPartitionManager.class));
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
when(network.createKvStateTaskRegistry(any(JobID.class),
any(JobVertexID.class)))
.thenReturn(mock(TaskKvStateRegistry.class));
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
- createTask(InvokableBlockingInInvoke.class, blobService,
libCache, network, consumableNotifier, partitionChecker,
Executors.directExecutor());
// Test all branches of trigger partition state check
-
{
// Reset latches
- createQueuesAndActors();
+ setup();
// PartitionProducerDisposedException
- Task task = createTask(InvokableBlockingInInvoke.class,
blobService, libCache, network, consumableNotifier, partitionChecker,
Executors.directExecutor());
-
- CompletableFuture<ExecutionState> promise = new
CompletableFuture<>();
+ final Task task = new TaskBuilder()
+ .setInvokable(InvokableBlockingInInvoke.class)
+ .setNetworkEnvironment(network)
+ .setConsumableNotifier(consumableNotifier)
+
.setPartitionProducerStateChecker(partitionChecker)
+ .setExecutor(Executors.directExecutor())
+ .build();
+
+ final CompletableFuture<ExecutionState> promise = new
CompletableFuture<>();
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()),
eq(resultId), eq(partitionId))).thenReturn(promise);
task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
@@ -659,12 +604,18 @@ public void testTriggerPartitionStateUpdate() throws
Exception {
{
// Reset latches
- createQueuesAndActors();
+ setup();
// Any other exception
- Task task = createTask(InvokableBlockingInInvoke.class,
blobService, libCache, network, consumableNotifier, partitionChecker,
Executors.directExecutor());
-
- CompletableFuture<ExecutionState> promise = new
CompletableFuture<>();
+ final Task task = new TaskBuilder()
+ .setInvokable(InvokableBlockingInInvoke.class)
+ .setNetworkEnvironment(network)
+ .setConsumableNotifier(consumableNotifier)
+
.setPartitionProducerStateChecker(partitionChecker)
+ .setExecutor(Executors.directExecutor())
+ .build();
+
+ final CompletableFuture<ExecutionState> promise = new
CompletableFuture<>();
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()),
eq(resultId), eq(partitionId))).thenReturn(promise);
task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
@@ -676,11 +627,19 @@ public void testTriggerPartitionStateUpdate() throws
Exception {
{
// Reset latches
- createQueuesAndActors();
+ setup();
// TimeoutException handled special => retry
- Task task = createTask(InvokableBlockingInInvoke.class,
blobService, libCache, network, consumableNotifier, partitionChecker,
Executors.directExecutor());
- SingleInputGate inputGate = mock(SingleInputGate.class);
+ // Any other exception
+ final Task task = new TaskBuilder()
+ .setInvokable(InvokableBlockingInInvoke.class)
+ .setNetworkEnvironment(network)
+ .setConsumableNotifier(consumableNotifier)
+
.setPartitionProducerStateChecker(partitionChecker)
+ .setExecutor(Executors.directExecutor())
+ .build();
+
+ final SingleInputGate inputGate =
mock(SingleInputGate.class);
when(inputGate.getConsumedResultId()).thenReturn(resultId);
try {
@@ -707,11 +666,18 @@ public void testTriggerPartitionStateUpdate() throws
Exception {
{
// Reset latches
- createQueuesAndActors();
+ setup();
// Success
- Task task = createTask(InvokableBlockingInInvoke.class,
blobService, libCache, network, consumableNotifier, partitionChecker,
Executors.directExecutor());
- SingleInputGate inputGate = mock(SingleInputGate.class);
+ final Task task = new TaskBuilder()
+ .setInvokable(InvokableBlockingInInvoke.class)
+ .setNetworkEnvironment(network)
+ .setConsumableNotifier(consumableNotifier)
+
.setPartitionProducerStateChecker(partitionChecker)
+ .setExecutor(Executors.directExecutor())
+ .build();
+
+ final SingleInputGate inputGate =
mock(SingleInputGate.class);
when(inputGate.getConsumedResultId()).thenReturn(resultId);
try {
@@ -744,22 +710,24 @@ public void testTriggerPartitionStateUpdate() throws
Exception {
*/
@Test
public void testWatchDogInterruptsTask() throws Exception {
- Configuration config = new Configuration();
+ final TaskManagerActions taskManagerActions = new
ProhibitFatalErrorTaskManagerActions();
+
+ final Configuration config = new Configuration();
config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5);
config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 60 * 1000);
- Task task = createTask(InvokableBlockingInCancel.class, config);
+ final Task task = new TaskBuilder()
+ .setInvokable(InvokableBlockingInCancel.class)
+ .setTaskManagerConfig(config)
+ .setTaskManagerActions(taskManagerActions)
+ .build();
+
task.startTaskThread();
awaitLatch.await();
task.cancelExecution();
task.getExecutingThread().join();
-
- // No fatal error
- for (Object msg : taskManagerMessages) {
- assertFalse("Unexpected FatalError message", msg
instanceof TaskManagerMessages.FatalError);
- }
}
/**
@@ -768,23 +736,25 @@ public void testWatchDogInterruptsTask() throws Exception
{
* This is resolved by the watch dog, no fatal error.
*/
@Test
- public void testInterruptableSharedLockInInvokeAndCancel() throws
Exception {
- Configuration config = new Configuration();
+ public void testInterruptibleSharedLockInInvokeAndCancel() throws
Exception {
+ final TaskManagerActions taskManagerActions = new
ProhibitFatalErrorTaskManagerActions();
+
+ final Configuration config = new Configuration();
config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL,
5);
config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
50);
- Task task =
createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
+ final Task task = new TaskBuilder()
+
.setInvokable(InvokableInterruptibleSharedLockInInvokeAndCancel.class)
+ .setTaskManagerConfig(config)
+ .setTaskManagerActions(taskManagerActions)
+ .build();
+
task.startTaskThread();
awaitLatch.await();
task.cancelExecution();
task.getExecutingThread().join();
-
- // No fatal error
- for (Object msg : taskManagerMessages) {
- assertFalse("Unexpected FatalError message", msg
instanceof TaskManagerMessages.FatalError);
- }
}
/**
@@ -792,12 +762,19 @@ public void
testInterruptableSharedLockInInvokeAndCancel() throws Exception {
* resolved by a fatal error.
*/
@Test
- public void testFatalErrorAfterUninterruptibleInvoke() throws Exception
{
- Configuration config = new Configuration();
+ public void testFatalErrorAfterUnInterruptibleInvoke() throws Exception
{
+ final AwaitFatalErrorTaskManagerActions taskManagerActions =
+ new AwaitFatalErrorTaskManagerActions();
+
+ final Configuration config = new Configuration();
config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL,
5);
config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
50);
- Task task =
createTask(InvokableUninterruptibleBlockingInvoke.class, config);
+ final Task task = new TaskBuilder()
+
.setInvokable(InvokableUnInterruptibleBlockingInvoke.class)
+ .setTaskManagerConfig(config)
+ .setTaskManagerActions(taskManagerActions)
+ .build();
try {
task.startTaskThread();
@@ -806,17 +783,11 @@ public void testFatalErrorAfterUninterruptibleInvoke()
throws Exception {
task.cancelExecution();
- for (int i = 0; i < 10; i++) {
- Object msg = taskManagerMessages.poll(1,
TimeUnit.SECONDS);
- if (msg instanceof
TaskManagerMessages.FatalError) {
- return; // success
- }
- }
-
- fail("Did not receive expected task manager message");
+ // wait for the notification of notifyFatalError
+ taskManagerActions.latch.await();
} finally {
// Interrupt again to clean up Thread
- cancelLatch.trigger();
+ triggerLatch.trigger();
task.getExecutingThread().interrupt();
task.getExecutingThread().join();
}
@@ -830,15 +801,19 @@ public void testTaskConfig() throws Exception {
long interval = 28218123;
long timeout = interval + 19292;
- Configuration config = new Configuration();
+ final Configuration config = new Configuration();
config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL,
interval);
config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
timeout);
- ExecutionConfig executionConfig = new ExecutionConfig();
+ final ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setTaskCancellationInterval(interval + 1337);
executionConfig.setTaskCancellationTimeout(timeout - 1337);
- Task task = createTask(InvokableBlockingInInvoke.class, config,
executionConfig);
+ final Task task = new TaskBuilder()
+ .setInvokable(InvokableBlockingInInvoke.class)
+ .setTaskManagerConfig(config)
+ .setExecutionConfig(executionConfig)
+ .build();
assertEquals(interval, task.getTaskCancellationInterval());
assertEquals(timeout, task.getTaskCancellationTimeout());
@@ -855,6 +830,69 @@ public void testTaskConfig() throws Exception {
}
//
------------------------------------------------------------------------
+ // customized TaskManagerActions
+ //
------------------------------------------------------------------------
+
+ /**
+ * Customized TaskManagerActions that queues all calls of
updateTaskExecutionState
+ */
+ private class QueuedNoOpTaskManagerActions extends
NoOpTaskManagerActions {
+ private final BlockingQueue<TaskExecutionState> queue = new
LinkedBlockingDeque<>();
+
+ @Override
+ public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {
+ queue.offer(taskExecutionState);
+ }
+
+ private void validateListenerMessage(ExecutionState state, Task
task, Throwable error) {
+ try {
+ // we may have to wait for a bit to give the
actors time to receive the message
+ // and put it into the queue
+ final TaskExecutionState taskState =
queue.take();
+ assertNotNull("There is no additional listener
message", state);
+
+ assertEquals(task.getJobID(),
taskState.getJobID());
+ assertEquals(task.getExecutionId(),
taskState.getID());
+ assertEquals(state,
taskState.getExecutionState());
+
+ final Throwable t =
taskState.getError(getClass().getClassLoader());
+ if (error == null) {
+ assertNull(t);
+ } else {
+ assertEquals(error.toString(),
t.toString());
+ }
+ } catch (InterruptedException e) {
+ fail("interrupted");
+ }
+ }
+ }
+
+ /**
+ * Customized TaskManagerActions that ensures no call of
notifyFatalError
+ */
+ private class ProhibitFatalErrorTaskManagerActions extends
NoOpTaskManagerActions {
+ @Override
+ public void notifyFatalError(String message, Throwable cause) {
+ throw new RuntimeException("Unexpected FatalError
notification");
+ }
+ }
+
+ /**
+ * Customized TaskManagerActions that waits for a call of
notifyFatalError
+ */
+ private class AwaitFatalErrorTaskManagerActions extends
NoOpTaskManagerActions {
+ private final OneShotLatch latch = new OneShotLatch();
+
+ @Override
+ public void notifyFatalError(String message, Throwable cause) {
+ latch.trigger();
+ }
+ }
+
+ //
------------------------------------------------------------------------
+ // helper functions
+ //
------------------------------------------------------------------------
+
private void setInputGate(Task task, SingleInputGate inputGate) {
try {
@@ -885,232 +923,159 @@ private void setState(Task task, ExecutionState state) {
}
}
- /**
- * Creates a {@link BlobCacheService} mock that is suitable to be used
in the tests above.
- *
- * @return BlobCache mock with the bare minimum of implemented
functions that work
- */
- private BlobCacheService createBlobCache() {
- return new BlobCacheService(
- mock(PermanentBlobCache.class),
- mock(TransientBlobCache.class));
- }
+ private final class TaskBuilder {
+ private Class<? extends AbstractInvokable> invokable;
+ private TaskManagerActions taskManagerActions;
+ private LibraryCacheManager libraryCacheManager;
+ private ResultPartitionConsumableNotifier consumableNotifier;
+ private PartitionProducerStateChecker
partitionProducerStateChecker;
+ private NetworkEnvironment networkEnvironment;
+ private Executor executor;
+ private Configuration taskManagerConfig;
+ private ExecutionConfig executionConfig;
+ private Collection<PermanentBlobKey> requiredJarFileBlobKeys;
- private Task createTask(Class<? extends AbstractInvokable> invokable)
throws IOException {
- return createTask(invokable, new Configuration(), new
ExecutionConfig());
- }
+ {
+ invokable = TestInvokableCorrect.class;
+ taskManagerActions = mock(TaskManagerActions.class);
- private Task createTask(Class<? extends AbstractInvokable> invokable,
Configuration config) throws IOException {
- BlobCacheService blobService = createBlobCache();
- LibraryCacheManager libCache = mock(LibraryCacheManager.class);
-
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
- return createTask(invokable, blobService, libCache, config, new
ExecutionConfig());
- }
+ libraryCacheManager = mock(LibraryCacheManager.class);
+
when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
- private Task createTask(Class<? extends AbstractInvokable> invokable,
Configuration config, ExecutionConfig execConfig) throws IOException {
- BlobCacheService blobService = createBlobCache();
- LibraryCacheManager libCache = mock(LibraryCacheManager.class);
-
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
- return createTask(invokable, blobService, libCache, config,
execConfig);
- }
+ consumableNotifier =
mock(ResultPartitionConsumableNotifier.class);
+ partitionProducerStateChecker =
mock(PartitionProducerStateChecker.class);
- private Task createTask(
- Class<? extends AbstractInvokable> invokable,
- BlobCacheService blobService,
- LibraryCacheManager libCache) throws IOException {
+ final ResultPartitionManager partitionManager =
mock(ResultPartitionManager.class);
+ final TaskEventDispatcher taskEventDispatcher =
mock(TaskEventDispatcher.class);
+ networkEnvironment = mock(NetworkEnvironment.class);
+
when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
+
when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
+
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class),
any(JobVertexID.class)))
+ .thenReturn(mock(TaskKvStateRegistry.class));
+
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
- return createTask(invokable, blobService, libCache, new
Configuration(), new ExecutionConfig());
- }
+ executor = TestingUtils.defaultExecutor();
- private Task createTask(
- Class<? extends AbstractInvokable> invokable,
- BlobCacheService blobService,
- LibraryCacheManager libCache,
- Configuration config,
- ExecutionConfig execConfig) throws IOException {
-
- ResultPartitionManager partitionManager =
mock(ResultPartitionManager.class);
- ResultPartitionConsumableNotifier consumableNotifier =
mock(ResultPartitionConsumableNotifier.class);
- PartitionProducerStateChecker partitionProducerStateChecker =
mock(PartitionProducerStateChecker.class);
- TaskEventDispatcher taskEventDispatcher =
mock(TaskEventDispatcher.class);
- Executor executor = mock(Executor.class);
- NetworkEnvironment network = mock(NetworkEnvironment.class);
-
when(network.getResultPartitionManager()).thenReturn(partitionManager);
-
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
- when(network.createKvStateTaskRegistry(any(JobID.class),
any(JobVertexID.class)))
- .thenReturn(mock(TaskKvStateRegistry.class));
-
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
+ taskManagerConfig = new Configuration();
+ executionConfig = new ExecutionConfig();
- return createTask(invokable, blobService, libCache, network,
consumableNotifier, partitionProducerStateChecker, executor, config,
execConfig);
- }
+ requiredJarFileBlobKeys = Collections.emptyList();
+ }
- private Task createTask(
- Class<? extends AbstractInvokable> invokable,
- BlobCacheService blobService,
- LibraryCacheManager libCache,
- NetworkEnvironment networkEnvironment,
- ResultPartitionConsumableNotifier consumableNotifier,
- PartitionProducerStateChecker
partitionProducerStateChecker,
- Executor executor) throws IOException {
- return createTask(invokable, blobService, libCache,
networkEnvironment, consumableNotifier, partitionProducerStateChecker,
executor, new Configuration(), new ExecutionConfig());
- }
+ TaskBuilder setInvokable(Class<? extends AbstractInvokable>
invokable) {
+ this.invokable = invokable;
+ return this;
+ }
- private Task createTask(
- Class<? extends AbstractInvokable> invokable,
- BlobCacheService blobService,
- LibraryCacheManager libCache,
- NetworkEnvironment networkEnvironment,
- ResultPartitionConsumableNotifier consumableNotifier,
- PartitionProducerStateChecker partitionProducerStateChecker,
- Executor executor,
- Configuration taskManagerConfig,
- ExecutionConfig execConfig) throws IOException {
-
- JobID jobId = new JobID();
- JobVertexID jobVertexId = new JobVertexID();
- ExecutionAttemptID executionAttemptId = new
ExecutionAttemptID();
-
- InputSplitProvider inputSplitProvider = new
TaskInputSplitProvider(
- jobManagerGateway,
- jobId,
- jobVertexId,
- executionAttemptId,
- new FiniteDuration(60, TimeUnit.SECONDS));
-
- CheckpointResponder checkpointResponder = new
ActorGatewayCheckpointResponder(jobManagerGateway);
-
- SerializedValue<ExecutionConfig> serializedExecutionConfig =
new SerializedValue<>(execConfig);
-
- JobInformation jobInformation = new JobInformation(
- jobId,
- "Test Job",
- serializedExecutionConfig,
- new Configuration(),
- Collections.emptyList(),
- Collections.emptyList());
-
- TaskInformation taskInformation = new TaskInformation(
- jobVertexId,
- "Test Task",
- 1,
- 1,
- invokable.getName(),
- new Configuration());
-
- TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
-
when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
-
- return new Task(
- jobInformation,
- taskInformation,
- executionAttemptId,
- new AllocationID(),
- 0,
- 0,
- Collections.emptyList(),
- Collections.emptyList(),
- 0,
- mock(MemoryManager.class),
- mock(IOManager.class),
- networkEnvironment,
- mock(BroadcastVariableManager.class),
- new TestTaskStateManager(),
- taskManagerConnection,
- inputSplitProvider,
- checkpointResponder,
- blobService,
- libCache,
- mock(FileCache.class),
- new TestingTaskManagerRuntimeInfo(taskManagerConfig),
- taskMetricGroup,
- consumableNotifier,
- partitionProducerStateChecker,
- executor);
- }
+ TaskBuilder setTaskManagerActions(TaskManagerActions
taskManagerActions) {
+ this.taskManagerActions = taskManagerActions;
+ return this;
+ }
- //
------------------------------------------------------------------------
- // Validation Methods
- //
------------------------------------------------------------------------
+ TaskBuilder setLibraryCacheManager(LibraryCacheManager
libraryCacheManager) {
+ this.libraryCacheManager = libraryCacheManager;
+ return this;
+ }
- private void validateUnregisterTask(ExecutionAttemptID id) {
- try {
- // we may have to wait for a bit to give the actors
time to receive the message
- // and put it into the queue
- Object rawMessage = taskManagerMessages.take();
+ TaskBuilder
setConsumableNotifier(ResultPartitionConsumableNotifier consumableNotifier) {
+ this.consumableNotifier = consumableNotifier;
+ return this;
+ }
- assertNotNull("There is no additional TaskManager
message", rawMessage);
- if (!(rawMessage instanceof
TaskMessages.TaskInFinalState)) {
- fail("TaskManager message is not
'UnregisterTask', but " + rawMessage.getClass());
- }
+ TaskBuilder
setPartitionProducerStateChecker(PartitionProducerStateChecker
partitionProducerStateChecker) {
+ this.partitionProducerStateChecker =
partitionProducerStateChecker;
+ return this;
+ }
- TaskMessages.TaskInFinalState message =
(TaskMessages.TaskInFinalState) rawMessage;
- assertEquals(id, message.executionID());
+ TaskBuilder setNetworkEnvironment(NetworkEnvironment
networkEnvironment) {
+ this.networkEnvironment = networkEnvironment;
+ return this;
}
- catch (InterruptedException e) {
- fail("interrupted");
+
+ TaskBuilder setExecutor(Executor executor) {
+ this.executor = executor;
+ return this;
}
- }
- private void validateTaskManagerStateChange(ExecutionState state, Task
task, boolean hasError) {
- try {
- // we may have to wait for a bit to give the actors
time to receive the message
- // and put it into the queue
- Object rawMessage = taskManagerMessages.take();
+ TaskBuilder setTaskManagerConfig(Configuration
taskManagerConfig) {
+ this.taskManagerConfig = taskManagerConfig;
+ return this;
+ }
- assertNotNull("There is no additional TaskManager
message", rawMessage);
- if (!(rawMessage instanceof
TaskMessages.UpdateTaskExecutionState)) {
- fail("TaskManager message is not
'UpdateTaskExecutionState', but " + rawMessage.getClass());
- }
+ TaskBuilder setExecutionConfig(ExecutionConfig executionConfig)
{
+ this.executionConfig = executionConfig;
+ return this;
+ }
- TaskMessages.UpdateTaskExecutionState message =
- (TaskMessages.UpdateTaskExecutionState)
rawMessage;
+ TaskBuilder
setRequiredJarFileBlobKeys(Collection<PermanentBlobKey>
requiredJarFileBlobKeys) {
+ this.requiredJarFileBlobKeys = requiredJarFileBlobKeys;
+ return this;
+ }
- TaskExecutionState taskState =
message.taskExecutionState();
+ private Task build() throws Exception {
+ final JobID jobId = new JobID();
+ final JobVertexID jobVertexId = new JobVertexID();
+ final ExecutionAttemptID executionAttemptId = new
ExecutionAttemptID();
- assertEquals(task.getJobID(), taskState.getJobID());
- assertEquals(task.getExecutionId(), taskState.getID());
- assertEquals(state, taskState.getExecutionState());
+ final SerializedValue<ExecutionConfig>
serializedExecutionConfig = new SerializedValue<>(executionConfig);
- if (hasError) {
-
assertNotNull(taskState.getError(getClass().getClassLoader()));
- } else {
-
assertNull(taskState.getError(getClass().getClassLoader()));
- }
- }
- catch (InterruptedException e) {
- fail("interrupted");
- }
- }
+ final JobInformation jobInformation = new
JobInformation(
+ jobId,
+ "Test Job",
+ serializedExecutionConfig,
+ new Configuration(),
+ requiredJarFileBlobKeys,
+ Collections.emptyList());
- private void validateListenerMessage(ExecutionState state, Task task,
boolean hasError) {
- try {
- // we may have to wait for a bit to give the actors
time to receive the message
- // and put it into the queue
- final TaskExecutionState taskState =
listenerMessages.take();
- assertNotNull("There is no additional listener
message", state);
-
- assertEquals(task.getJobID(), taskState.getJobID());
- assertEquals(task.getExecutionId(), taskState.getID());
- assertEquals(state, taskState.getExecutionState());
-
- if (hasError) {
-
assertNotNull(taskState.getError(getClass().getClassLoader()));
- } else {
-
assertNull(taskState.getError(getClass().getClassLoader()));
- }
- }
- catch (InterruptedException e) {
- fail("interrupted");
+ final TaskInformation taskInformation = new
TaskInformation(
+ jobVertexId,
+ "Test Task",
+ 1,
+ 1,
+ invokable.getName(),
+ new Configuration());
+
+ final BlobCacheService blobCacheService = new
BlobCacheService(
+ mock(PermanentBlobCache.class),
+ mock(TransientBlobCache.class));
+
+ final TaskMetricGroup taskMetricGroup =
mock(TaskMetricGroup.class);
+
when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
+
+ return new Task(
+ jobInformation,
+ taskInformation,
+ executionAttemptId,
+ new AllocationID(),
+ 0,
+ 0,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ 0,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ networkEnvironment,
+ mock(BroadcastVariableManager.class),
+ new TestTaskStateManager(),
+ taskManagerActions,
+ new MockInputSplitProvider(),
+ new TestCheckpointResponder(),
+ blobCacheService,
+ libraryCacheManager,
+ mock(FileCache.class),
+ new
TestingTaskManagerRuntimeInfo(taskManagerConfig),
+ taskMetricGroup,
+ consumableNotifier,
+ partitionProducerStateChecker,
+ executor);
}
}
- //
--------------------------------------------------------------------------------------------
- // Mock invokable code
- //
--------------------------------------------------------------------------------------------
-
- /** Test task class. */
- public static final class TestInvokableCorrect extends
AbstractInvokable {
+ //
------------------------------------------------------------------------
+ // test task classes
+ //
------------------------------------------------------------------------
+ private static final class TestInvokableCorrect extends
AbstractInvokable {
public TestInvokableCorrect(Environment environment) {
super(environment);
}
@@ -1119,14 +1084,18 @@ public TestInvokableCorrect(Environment environment) {
public void invoke() {}
@Override
- public void cancel() throws Exception {
+ public void cancel() {
fail("This should not be called");
}
}
- /** Test task class. */
- public static final class InvokableWithExceptionInInvoke extends
AbstractInvokable {
+ private abstract static class InvokableNonInstantiable extends
AbstractInvokable {
+ public InvokableNonInstantiable(Environment environment) {
+ super(environment);
+ }
+ }
+ private static final class InvokableWithExceptionInInvoke extends
AbstractInvokable {
public InvokableWithExceptionInInvoke(Environment environment) {
super(environment);
}
@@ -1137,44 +1106,21 @@ public void invoke() throws Exception {
}
}
- /** Test task class. */
- public static final class InvokableWithExceptionOnTrigger extends
AbstractInvokable {
-
- public InvokableWithExceptionOnTrigger(Environment environment)
{
+ private static final class FailingInvokableWithChainedException extends
AbstractInvokable {
+ public FailingInvokableWithChainedException(Environment
environment) {
super(environment);
}
@Override
public void invoke() {
- awaitLatch.trigger();
-
- // make sure that the interrupt call does not
- // grab us out of the lock early
- while (true) {
- try {
- triggerLatch.await();
- break;
- }
- catch (InterruptedException e) {
- // fall through the loop
- }
- }
-
- throw new RuntimeException("test");
+ throw new TestWrappedException(new IOException("test"));
}
- }
-
- /** Test task class. */
- public abstract static class InvokableNonInstantiable extends
AbstractInvokable {
- public InvokableNonInstantiable(Environment environment) {
- super(environment);
- }
+ @Override
+ public void cancel() {}
}
- /** Test task class. */
- public static final class InvokableBlockingInInvoke extends
AbstractInvokable {
-
+ private static final class InvokableBlockingInInvoke extends
AbstractInvokable {
public InvokableBlockingInInvoke(Environment environment) {
super(environment);
}
@@ -1190,64 +1136,60 @@ public void invoke() throws Exception {
}
}
- /** Test task class. */
- public static final class InvokableWithCancelTaskExceptionInInvoke
extends AbstractInvokable {
-
- public InvokableWithCancelTaskExceptionInInvoke(Environment
environment) {
+ public static final class InvokableWithExceptionOnTrigger extends
AbstractInvokable {
+ public InvokableWithExceptionOnTrigger(Environment environment)
{
super(environment);
}
@Override
- public void invoke() throws Exception {
+ public void invoke() {
awaitLatch.trigger();
- try {
- triggerLatch.await();
+ // make sure that the interrupt call does not
+ // grab us out of the lock early
+ while (true) {
+ try {
+ triggerLatch.await();
+ break;
+ }
+ catch (InterruptedException e) {
+ // fall through the loop
+ }
}
- catch (Throwable ignored) {}
- throw new CancelTaskException();
+ throw new RuntimeException("test");
}
}
- /** Test task class. */
- public static final class
InvokableInterruptableSharedLockInInvokeAndCancel extends AbstractInvokable {
-
- private final Object lock = new Object();
-
- public
InvokableInterruptableSharedLockInInvokeAndCancel(Environment environment) {
+ public static final class InvokableWithCancelTaskExceptionInInvoke
extends AbstractInvokable {
+ public InvokableWithCancelTaskExceptionInInvoke(Environment
environment) {
super(environment);
}
@Override
- public void invoke() throws Exception {
- synchronized (lock) {
- awaitLatch.trigger();
- wait();
- }
- }
+ public void invoke() {
+ awaitLatch.trigger();
- @Override
- public void cancel() throws Exception {
- synchronized (lock) {
- cancelLatch.trigger();
+ try {
+ triggerLatch.await();
}
+ catch (Throwable ignored) {}
+
+ throw new CancelTaskException();
}
}
- /** Test task class. */
public static final class InvokableBlockingInCancel extends
AbstractInvokable {
-
public InvokableBlockingInCancel(Environment environment) {
super(environment);
}
@Override
- public void invoke() throws Exception {
+ public void invoke() {
awaitLatch.trigger();
try {
- cancelLatch.await();
+ triggerLatch.await(); // await cancel
synchronized (this) {
wait();
}
@@ -1261,51 +1203,57 @@ public void invoke() throws Exception {
@Override
public void cancel() throws Exception {
synchronized (this) {
- cancelLatch.trigger();
+ triggerLatch.trigger();
wait();
}
}
}
- /** Test task class. */
- public static final class InvokableUninterruptibleBlockingInvoke
extends AbstractInvokable {
+ public static final class
InvokableInterruptibleSharedLockInInvokeAndCancel extends AbstractInvokable {
+ private final Object lock = new Object();
- public InvokableUninterruptibleBlockingInvoke(Environment
environment) {
+ public
InvokableInterruptibleSharedLockInInvokeAndCancel(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
- while (!cancelLatch.isTriggered()) {
- try {
- synchronized (this) {
- awaitLatch.trigger();
- wait();
- }
- } catch (InterruptedException ignored) {
- }
+ synchronized (lock) {
+ awaitLatch.trigger();
+ wait();
}
}
@Override
- public void cancel() throws Exception {
+ public void cancel() {
+ synchronized (lock) {
+ // do nothing but a placeholder
+ triggerLatch.trigger();
+ }
}
}
- /** Test task class. */
- public static final class FailingInvokableWithChainedException extends
AbstractInvokable {
-
- public FailingInvokableWithChainedException(Environment
environment) {
+ public static final class InvokableUnInterruptibleBlockingInvoke
extends AbstractInvokable {
+ public InvokableUnInterruptibleBlockingInvoke(Environment
environment) {
super(environment);
}
@Override
- public void invoke() throws Exception {
- throw new TestWrappedException(new IOException("test"));
+ public void invoke() {
+ while (!triggerLatch.isTriggered()) {
+ try {
+ synchronized (this) {
+ awaitLatch.trigger();
+ wait();
+ }
+ } catch (InterruptedException ignored) {
+ }
+ }
}
@Override
- public void cancel() {}
+ public void cancel() {
+ }
}
//
------------------------------------------------------------------------
@@ -1315,7 +1263,7 @@ public void cancel() {}
private static class TestWrappedException extends
WrappingRuntimeException {
private static final long serialVersionUID = 1L;
- public TestWrappedException(@Nonnull Throwable cause) {
+ TestWrappedException(@Nonnull Throwable cause) {
super(cause);
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Port TaskTest to new code base
> ------------------------------
>
> Key: FLINK-10426
> URL: https://issues.apache.org/jira/browse/FLINK-10426
> Project: Flink
> Issue Type: Sub-task
> Components: Tests
> Affects Versions: 1.7.0
> Reporter: TisonKun
> Assignee: TisonKun
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{TaskTest}} to new code base
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)