This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 83d8e29f6d11354eafcfd5428535a65efffc3a80 Author: Anton Kalashnikov <[email protected]> AuthorDate: Thu Jan 26 17:26:59 2023 +0100 [FLINK-15550][runtime] The static latches in TaskTest were replaced by latches from invokable objects. --- .../apache/flink/runtime/taskmanager/TaskTest.java | 192 +++++++++++++-------- 1 file changed, 123 insertions(+), 69 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index fd925ec60b2..8e728bb87d7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.consumer.RemoteChannelStateChecker; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.shuffle.PartitionDescriptor; @@ -76,6 +77,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; @@ -100,9 +102,6 @@ import static org.mockito.Mockito.when; public class TaskTest extends TestLogger { private static final String RESTORE_EXCEPTION_MSG = "TestExceptionInRestore"; - private static OneShotLatch awaitLatch; - private static OneShotLatch triggerLatch; - private ShuffleEnvironment<?, ?> shuffleEnvironment; @ClassRule @@ -115,9 +114,6 @@ public class TaskTest extends TestLogger { @Before public void setup() { - awaitLatch = new OneShotLatch(); - triggerLatch = new OneShotLatch(); - shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build(); wasCleanedUp = false; } @@ -154,7 +150,7 @@ public class TaskTest extends TestLogger { .setInvokable(InvokableBlockingInRestore.class) .build(Executors.directExecutor()); task.startTaskThread(); - awaitLatch.await(); + awaitInvokableLatch(task); task.cancelExecution(); task.getExecutingThread().join(); assertTrue(wasCleanedUp); @@ -445,7 +441,7 @@ public class TaskTest extends TestLogger { task.startTaskThread(); // wait till the task is in restore - awaitLatch.await(); + awaitInvokableLatch(task); task.cancelExecution(); assertTrue( @@ -475,7 +471,7 @@ public class TaskTest extends TestLogger { task.startTaskThread(); // wait till the task is in invoke - awaitLatch.await(); + awaitInvokableLatch(task); task.cancelExecution(); assertTrue( @@ -506,7 +502,7 @@ public class TaskTest extends TestLogger { task.startTaskThread(); // wait till the task is in invoke - awaitLatch.await(); + awaitInvokableLatch(task); task.failExternally(new Exception(RESTORE_EXCEPTION_MSG)); @@ -534,7 +530,7 @@ public class TaskTest extends TestLogger { task.startTaskThread(); // wait till the task is in invoke - awaitLatch.await(); + awaitInvokableLatch(task); task.failExternally(new Exception("test")); @@ -587,13 +583,13 @@ public class TaskTest extends TestLogger { task.startTaskThread(); // wait till the task is in invoke - awaitLatch.await(); + awaitInvokableLatch(task); task.cancelExecution(); assertEquals(ExecutionState.CANCELING, task.getExecutionState()); // this causes an exception - triggerLatch.trigger(); + triggerInvokableLatch(task); task.getExecutingThread().join(); @@ -620,13 +616,13 @@ public class TaskTest extends TestLogger { task.startTaskThread(); // wait till the task is in invoke - awaitLatch.await(); + awaitInvokableLatch(task); task.failExternally(new Exception("external")); assertEquals(ExecutionState.FAILED, task.getExecutionState()); // this causes an exception - triggerLatch.trigger(); + triggerInvokableLatch(task); task.getExecutingThread().join(); @@ -647,11 +643,12 @@ public class TaskTest extends TestLogger { .setInvokable(InvokableWithCancelTaskExceptionInInvoke.class) .build(Executors.directExecutor()); - // Cause CancelTaskException. - triggerLatch.trigger(); + task.startTaskThread(); - task.run(); + // Cause CancelTaskException. + triggerInvokableLatch(task); + task.getExecutingThread().join(); assertEquals(ExecutionState.CANCELED, task.getExecutionState()); } @@ -665,14 +662,14 @@ public class TaskTest extends TestLogger { task.startTaskThread(); // Wait till the task is in invoke. - awaitLatch.await(); + awaitInvokableLatch(task); task.failExternally(new Exception("external")); assertEquals(ExecutionState.FAILED, task.getExecutionState()); // Either we cause the CancelTaskException or the TaskCanceler // by interrupting the invokable. - triggerLatch.trigger(); + triggerInvokableLatch(task); task.getExecutingThread().join(); @@ -834,7 +831,7 @@ public class TaskTest extends TestLogger { try { task.startTaskThread(); - awaitLatch.await(); + awaitInvokableLatch(task); CompletableFuture<ExecutionState> promise = new CompletableFuture<>(); when(partitionChecker.requestPartitionProducerState( @@ -877,7 +874,7 @@ public class TaskTest extends TestLogger { try { task.startTaskThread(); - awaitLatch.await(); + awaitInvokableLatch(task); CompletableFuture<ExecutionState> promise = new CompletableFuture<>(); when(partitionChecker.requestPartitionProducerState( @@ -927,7 +924,7 @@ public class TaskTest extends TestLogger { task.startTaskThread(); - awaitLatch.await(); + awaitInvokableLatch(task); task.cancelExecution(); task.getExecutingThread().join(); @@ -955,7 +952,7 @@ public class TaskTest extends TestLogger { task.startTaskThread(); - awaitLatch.await(); + awaitInvokableLatch(task); task.cancelExecution(); task.getExecutingThread().join(); @@ -987,7 +984,7 @@ public class TaskTest extends TestLogger { try { task.startTaskThread(); - awaitLatch.await(); + awaitInvokableLatch(task); task.cancelExecution(); @@ -996,7 +993,7 @@ public class TaskTest extends TestLogger { assertThat(fatalError, is(notNullValue())); } finally { // Interrupt again to clean up Thread - triggerLatch.trigger(); + triggerInvokableLatch(task); task.getExecutingThread().interrupt(); task.getExecutingThread().join(); } @@ -1016,31 +1013,33 @@ public class TaskTest extends TestLogger { config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5); config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50); - final Task task = - spy( - createTaskBuilder() - .setInvokable(InvokableBlockingWithTrigger.class) - .setTaskManagerConfig(config) - .setTaskManagerActions(taskManagerActions) - .build(Executors.directExecutor())); + // We need to remember the original object since all changes in `startTaskThread` applies + // to it rather than to spy object. + Task task = + createTaskBuilder() + .setInvokable(InvokableBlockingWithTrigger.class) + .setTaskManagerConfig(config) + .setTaskManagerActions(taskManagerActions) + .build(Executors.directExecutor()); + final Task spyTask = spy(task); final Class<OutOfMemoryError> fatalErrorType = OutOfMemoryError.class; doThrow(fatalErrorType) - .when(task) + .when(spyTask) .cancelOrFailAndCancelInvokableInternal(eq(ExecutionState.CANCELING), eq(null)); try { - task.startTaskThread(); + spyTask.startTaskThread(); - awaitLatch.await(); + awaitInvokableLatch(task); - task.cancelExecution(); + spyTask.cancelExecution(); // wait for the notification of notifyFatalError final Throwable fatalError = fatalErrorFuture.join(); assertThat(fatalError, instanceOf(fatalErrorType)); } finally { - triggerLatch.trigger(); + triggerInvokableLatch(task); } } @@ -1070,7 +1069,7 @@ public class TaskTest extends TestLogger { task.startTaskThread(); - awaitLatch.await(); + awaitInvokableLatch(task); assertEquals( executionConfig.getTaskCancellationInterval(), task.getTaskCancellationInterval()); @@ -1093,11 +1092,11 @@ public class TaskTest extends TestLogger { task.startTaskThread(); // wait till the task is in invoke - awaitLatch.await(); + awaitInvokableLatch(task); assertFalse(task.getTerminationFuture().isDone()); - triggerLatch.trigger(); + triggerInvokableLatch(task); task.getExecutingThread().join(); @@ -1162,7 +1161,7 @@ public class TaskTest extends TestLogger { task.startTaskThread(); try { - awaitLatch.await(); + awaitInvokableLatch(task); assertEquals(ExecutionState.RUNNING, task.getExecutionState()); assertCheckpointDeclined( @@ -1181,7 +1180,7 @@ public class TaskTest extends TestLogger { InvokableDeclingingCheckpoints.TRIGGERING_FAILED_CHECKPOINT_ID, CheckpointFailureReason.TASK_FAILURE); } finally { - triggerLatch.trigger(); + triggerInvokableLatch(task); task.getExecutingThread().join(); } assertEquals(ExecutionState.FINISHED, task.getTerminationFuture().getNow(null)); @@ -1211,6 +1210,32 @@ public class TaskTest extends TestLogger { testCheckpointResponder.clear(); } + private TaskInvokable waitForInvokable(Task task) throws Exception { + waitUntilCondition(() -> task.getInvokable() != null, 10L); + + return task.getInvokable(); + } + + private void awaitInvokableLatch(Task task) throws Exception { + TaskInvokable taskInvokable = waitForInvokable(task); + if (!(taskInvokable instanceof AwaitLatchInvokable)) { + throw new Exception( + "Invokable doesn't implement class - " + AwaitLatchInvokable.class.getName()); + } + + ((AwaitLatchInvokable) taskInvokable).await(); + } + + private void triggerInvokableLatch(Task task) throws Exception { + TaskInvokable taskInvokable = waitForInvokable(task); + if (!(taskInvokable instanceof TriggerLatchInvokable)) { + throw new Exception( + "Invokable doesn't implement class - " + TriggerLatchInvokable.class.getName()); + } + + ((TriggerLatchInvokable) taskInvokable).trigger(); + } + // ------------------------------------------------------------------------ // customized TaskManagerActions // ------------------------------------------------------------------------ @@ -1343,7 +1368,7 @@ public class TaskTest extends TestLogger { public void cancel() {} } - private static class InvokableBlockingWithTrigger extends AbstractInvokable { + private static class InvokableBlockingWithTrigger extends TriggerLatchInvokable { public InvokableBlockingWithTrigger(Environment environment) { super(environment); } @@ -1385,7 +1410,7 @@ public class TaskTest extends TestLogger { } } - private static final class InvokableBlockingInInvoke extends AbstractInvokable { + private static final class InvokableBlockingInInvoke extends AwaitLatchInvokable { public InvokableBlockingInInvoke(Environment environment) { super(environment); } @@ -1403,7 +1428,7 @@ public class TaskTest extends TestLogger { } } - private static final class InvokableBlockingInRestore extends AbstractInvokable { + private static final class InvokableBlockingInRestore extends AwaitLatchInvokable { public InvokableBlockingInRestore(Environment environment) { super(environment); } @@ -1431,7 +1456,7 @@ public class TaskTest extends TestLogger { } /** {@link AbstractInvokable} which throws {@link RuntimeException} on invoke. */ - public static final class InvokableWithExceptionOnTrigger extends AbstractInvokable { + public static final class InvokableWithExceptionOnTrigger extends TriggerLatchInvokable { public InvokableWithExceptionOnTrigger(Environment environment) { super(environment); } @@ -1444,23 +1469,9 @@ public class TaskTest extends TestLogger { } } - private static void awaitTriggerLatch() { - awaitLatch.trigger(); - - // make sure that the interrupt call does not - // grab us out of the lock early - while (true) { - try { - triggerLatch.await(); - break; - } catch (InterruptedException e) { - // fall through the loop - } - } - } - /** {@link AbstractInvokable} which throws {@link CancelTaskException} on invoke. */ - public static final class InvokableWithCancelTaskExceptionInInvoke extends AbstractInvokable { + public static final class InvokableWithCancelTaskExceptionInInvoke + extends TriggerLatchInvokable { public InvokableWithCancelTaskExceptionInInvoke(Environment environment) { super(environment); } @@ -1474,7 +1485,7 @@ public class TaskTest extends TestLogger { } /** {@link AbstractInvokable} which blocks in cancel. */ - public static final class InvokableBlockingInCancel extends AbstractInvokable { + public static final class InvokableBlockingInCancel extends TriggerLatchInvokable { public InvokableBlockingInCancel(Environment environment) { super(environment); } @@ -1506,7 +1517,7 @@ public class TaskTest extends TestLogger { /** {@link AbstractInvokable} which blocks in cancel and is interruptible. */ public static final class InvokableInterruptibleSharedLockInInvokeAndCancel - extends AbstractInvokable { + extends TriggerLatchInvokable { private final Object lock = new Object(); public InvokableInterruptibleSharedLockInInvokeAndCancel(Environment environment) { @@ -1515,9 +1526,11 @@ public class TaskTest extends TestLogger { @Override public void invoke() throws Exception { - synchronized (lock) { - awaitLatch.trigger(); - wait(); + while (!triggerLatch.isTriggered()) { + synchronized (lock) { + awaitLatch.trigger(); + lock.wait(); + } } } @@ -1531,7 +1544,7 @@ public class TaskTest extends TestLogger { } /** {@link AbstractInvokable} which blocks in cancel and is not interruptible. */ - public static final class InvokableUnInterruptibleBlockingInvoke extends AbstractInvokable { + public static final class InvokableUnInterruptibleBlockingInvoke extends TriggerLatchInvokable { public InvokableUnInterruptibleBlockingInvoke(Environment environment) { super(environment); } @@ -1564,4 +1577,45 @@ public class TaskTest extends TestLogger { super(cause); } } + + private abstract static class AwaitLatchInvokable extends AbstractInvokable { + + final OneShotLatch awaitLatch = new OneShotLatch(); + + public AwaitLatchInvokable(Environment environment) { + super(environment); + } + + void await() throws InterruptedException { + awaitLatch.await(); + } + } + + private abstract static class TriggerLatchInvokable extends AwaitLatchInvokable { + + final OneShotLatch triggerLatch = new OneShotLatch(); + + public TriggerLatchInvokable(Environment environment) { + super(environment); + } + + void trigger() { + triggerLatch.trigger(); + } + + void awaitTriggerLatch() { + awaitLatch.trigger(); + + // make sure that the interrupt call does not + // grab us out of the lock early + while (true) { + try { + triggerLatch.await(); + break; + } catch (InterruptedException e) { + // fall through the loop + } + } + } + } }
