This is an automated email from the ASF dual-hosted git repository.
yuanmei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fb2d690 [FLINK-23558][streaming] Ignoring RejectedExecutionException
during s… (#16653)
fb2d690 is described below
commit fb2d690f2c91c459e3a8ae1fa4b9c84ea4198956
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Thu Aug 5 05:10:41 2021 +0200
[FLINK-23558][streaming] Ignoring RejectedExecutionException during s…
(#16653)
* [FLINK-23558][streaming] Ignoring RejectedExecutionException during
submitting the throughput calculation
* [hotfix] Clarification of execution thread of actionExecutor in javadoc
---
.../flink/streaming/runtime/tasks/StreamTask.java | 17 +++-----
.../streaming/runtime/tasks/StreamTaskTest.java | 51 ++++++++++++++++++++++
2 files changed, 56 insertions(+), 12 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 858b868..e68df16 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -180,7 +180,8 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>> extends Ab
/**
* All actions outside of the task {@link #mailboxProcessor mailbox} (i.e.
performed by another
* thread) must be executed through this executor to ensure that we don't
have concurrent method
- * calls that void consistent checkpoints.
+ * calls that void consistent checkpoints. The execution will always be
performed in the task
+ * thread.
*
* <p>CheckpointLock is superseded by {@link MailboxExecutor}, with {@link
* StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor
@@ -685,7 +686,7 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>> extends Ab
afterInvoke();
}
- private void throughputCalculationSetup() {
+ void throughputCalculationSetup() {
systemTimerService.registerTimer(
systemTimerService.getCurrentProcessingTime()
+
getTaskConfiguration().get(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD),
@@ -741,9 +742,6 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>> extends Ab
LOG.debug("Finished task {}", getName());
getCompletionFuture().exceptionally(unused -> null).join();
- final CompletableFuture<Void> timersFinishedFuture = new
CompletableFuture<>();
- final CompletableFuture<Void> systemTimersFinishedFuture = new
CompletableFuture<>();
-
// close all operators in a chain effect way
operatorChain.finishOperators(actionExecutor);
finishedOperators = true;
@@ -775,10 +773,9 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>> extends Ab
// at the same time, this makes sure that during any "regular" exit
where still
actionExecutor.runThrowing(
() -> {
-
// make sure no new timers can come
- FutureUtils.forward(timerService.quiesce(),
timersFinishedFuture);
- FutureUtils.forward(systemTimerService.quiesce(),
systemTimersFinishedFuture);
+ timerService.quiesce().get();
+ systemTimerService.quiesce().get();
// let mailbox execution reject all new letters from this
point
mailboxProcessor.prepareClose();
@@ -796,10 +793,6 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>> extends Ab
isRunning = false;
});
- // make sure all timers finish
- timersFinishedFuture.get();
- systemTimersFinishedFuture.get();
-
LOG.debug("Closed operators for task {}", getName());
// make sure all buffered data is flushed
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 15c77f8..b9741e1 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -1637,6 +1637,57 @@ public class StreamTaskTest extends TestLogger {
}
@Test
+ public void
testQuiesceOfMailboxRightBeforeSubmittingActionViaTimerService() throws
Exception {
+ // given: the stream task with configured handle async exception.
+ AtomicBoolean submitThroughputFail = new AtomicBoolean();
+ MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();
+
+ final UnAvailableTestInputProcessor inputProcessor = new
UnAvailableTestInputProcessor();
+ RunningTask<StreamTask<?, ?>> task =
+ runTask(
+ () ->
+ new MockStreamTaskBuilder(mockEnvironment)
+ .setHandleAsyncException(
+ (str, t) ->
submitThroughputFail.set(true))
+
.setStreamInputProcessor(inputProcessor)
+ .build());
+
+ waitTaskIsRunning(task.streamTask, task.invocationFuture);
+
+ TimerService timerService = task.streamTask.systemTimerService;
+ MailboxExecutor mainMailboxExecutor =
+ task.streamTask.mailboxProcessor.getMainMailboxExecutor();
+
+ CountDownLatch stoppingMailboxLatch = new CountDownLatch(1);
+ timerService.registerTimer(
+ timerService.getCurrentProcessingTime(),
+ (time) -> {
+ stoppingMailboxLatch.await();
+ // The time to the start 'afterInvoke' inside of mailbox.
+ // 'afterInvoke' won't finish until this execution won't
finish so it is
+ // impossible to wait on latch or something else.
+ Thread.sleep(5);
+ mainMailboxExecutor.submit(() -> {}, "test");
+ });
+
+ // when: Calling the quiesce for mailbox and finishing the timer
service.
+ mainMailboxExecutor
+ .submit(
+ () -> {
+ stoppingMailboxLatch.countDown();
+ task.streamTask.afterInvoke();
+ },
+ "test")
+ .get();
+
+ // then: the exception handle wasn't invoked because the such
situation is expected.
+ assertFalse(submitThroughputFail.get());
+
+ // Correctly shutdown the stream task to avoid hanging.
+
inputProcessor.availabilityProvider.getUnavailableToResetAvailable().complete(null);
+ }
+
+ @Test
public void testTaskAvoidHangingAfterSnapshotStateThrownException() throws
Exception {
// given: Configured SourceStreamTask with source which fails on
checkpoint.
Configuration taskManagerConfig = new Configuration();