This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 10e63412c7d [FLINK-25454][runtime] Pause and resume time for 
throughput calculator only from one thread.
10e63412c7d is described below

commit 10e63412c7d82b21c82072bd40413195ca3dbf31
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Wed Jun 22 15:21:51 2022 +0200

    [FLINK-25454][runtime] Pause and resume time for throughput calculator only 
from one thread.
---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 22 +---------------------
 .../runtime/tasks/mailbox/MailboxProcessor.java    |  2 ++
 2 files changed, 3 insertions(+), 21 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 19e1e6b2f1c..2b74fa8f72f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -87,7 +87,6 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.bufferdebloat.BufferDebloater;
 import org.apache.flink.streaming.runtime.tasks.mailbox.GaugePeriodTimer;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
-import 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction.Suspension;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
 import org.apache.flink.streaming.runtime.tasks.mailbox.PeriodTimer;
@@ -529,9 +528,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                             ioMetrics.getIdleTimeMsPerSecond(), 
throughputCalculator);
             resumeFuture = inputProcessor.getAvailableFuture();
         }
-        assertNoException(
-                resumeFuture.thenRun(
-                        new 
ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
+        
assertNoException(resumeFuture.thenRun(controller.suspendDefaultAction(timer)::resume));
     }
 
     protected void endData() throws Exception {
@@ -1700,23 +1697,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         return latestAsyncCheckpointStartDelayNanos;
     }
 
-    private static class ResumeWrapper implements Runnable {
-        private final Suspension suspendedDefaultAction;
-        private final PeriodTimer timer;
-
-        public ResumeWrapper(Suspension suspendedDefaultAction, PeriodTimer 
timer) {
-            this.suspendedDefaultAction = suspendedDefaultAction;
-            timer.markStart();
-            this.timer = timer;
-        }
-
-        @Override
-        public void run() {
-            timer.markEnd();
-            suspendedDefaultAction.resume();
-        }
-    }
-
     /**
      * Implementation of {@link 
org.apache.flink.streaming.runtime.tasks.mailbox.PeriodTimer} which
      * combine signal for metric and the throughput.
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index a40162048fa..9b526191668 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -391,6 +391,7 @@ public class MailboxProcessor implements Closeable {
             suspendedDefaultAction = new 
DefaultActionSuspension(suspensionTimer);
         }
 
+        maybeRestartIdleTimer();
         return suspendedDefaultAction;
     }
 
@@ -468,6 +469,7 @@ public class MailboxProcessor implements Closeable {
         }
 
         private void resumeInternal() {
+            maybePauseIdleTimer();
             if (suspendedDefaultAction == this) {
                 suspendedDefaultAction = null;
             }

Reply via email to