This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 10e63412c7d [FLINK-25454][runtime] Pause and resume time for
throughput calculator only from one thread.
10e63412c7d is described below
commit 10e63412c7d82b21c82072bd40413195ca3dbf31
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Wed Jun 22 15:21:51 2022 +0200
[FLINK-25454][runtime] Pause and resume time for throughput calculator only
from one thread.
---
.../flink/streaming/runtime/tasks/StreamTask.java | 22 +---------------------
.../runtime/tasks/mailbox/MailboxProcessor.java | 2 ++
2 files changed, 3 insertions(+), 21 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 19e1e6b2f1c..2b74fa8f72f 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -87,7 +87,6 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.bufferdebloat.BufferDebloater;
import org.apache.flink.streaming.runtime.tasks.mailbox.GaugePeriodTimer;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
-import
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction.Suspension;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.streaming.runtime.tasks.mailbox.PeriodTimer;
@@ -529,9 +528,7 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
ioMetrics.getIdleTimeMsPerSecond(),
throughputCalculator);
resumeFuture = inputProcessor.getAvailableFuture();
}
- assertNoException(
- resumeFuture.thenRun(
- new
ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
+
assertNoException(resumeFuture.thenRun(controller.suspendDefaultAction(timer)::resume));
}
protected void endData() throws Exception {
@@ -1700,23 +1697,6 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
return latestAsyncCheckpointStartDelayNanos;
}
- private static class ResumeWrapper implements Runnable {
- private final Suspension suspendedDefaultAction;
- private final PeriodTimer timer;
-
- public ResumeWrapper(Suspension suspendedDefaultAction, PeriodTimer
timer) {
- this.suspendedDefaultAction = suspendedDefaultAction;
- timer.markStart();
- this.timer = timer;
- }
-
- @Override
- public void run() {
- timer.markEnd();
- suspendedDefaultAction.resume();
- }
- }
-
/**
* Implementation of {@link
org.apache.flink.streaming.runtime.tasks.mailbox.PeriodTimer} which
* combine signal for metric and the throughput.
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index a40162048fa..9b526191668 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -391,6 +391,7 @@ public class MailboxProcessor implements Closeable {
suspendedDefaultAction = new
DefaultActionSuspension(suspensionTimer);
}
+ maybeRestartIdleTimer();
return suspendedDefaultAction;
}
@@ -468,6 +469,7 @@ public class MailboxProcessor implements Closeable {
}
private void resumeInternal() {
+ maybePauseIdleTimer();
if (suspendedDefaultAction == this) {
suspendedDefaultAction = null;
}