This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9c68f02c5635b1e2e426a8da52c23c84d0bf4fdf
Author: Piotr Nowojski <[email protected]>
AuthorDate: Wed Dec 30 17:43:10 2020 +0100

    [FLINK-20717][metrics] Provide new backPressuredTimeMsPerSecond metric
---
 docs/ops/metrics.md                                |   7 +-
 docs/ops/metrics.zh.md                             |   7 +-
 .../partition/BufferWritingResultPartition.java    |  13 +--
 .../apache/flink/runtime/metrics/MetricNames.java  |   1 +
 .../runtime/metrics/groups/TaskIOMetricGroup.java  |   7 ++
 .../io/network/partition/ResultPartitionTest.java  |   9 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  49 ++++++----
 .../runtime/tasks/mailbox/MailboxProcessor.java    |  17 ----
 .../streaming/runtime/tasks/StreamTaskTest.java    | 102 +++++++++++++++++++++
 .../tasks/mailbox/TaskMailboxProcessorTest.java    |  67 --------------
 10 files changed, 166 insertions(+), 113 deletions(-)

diff --git a/docs/ops/metrics.md b/docs/ops/metrics.md
index 9b7a6c3..e2e9370 100644
--- a/docs/ops/metrics.md
+++ b/docs/ops/metrics.md
@@ -1250,7 +1250,12 @@ Certain RocksDB native metrics are available but 
disabled by default, you can fi
     </tr>
     <tr>
       <td>idleTimeMsPerSecond</td>
-      <td>The time (in milliseconds) this task is idle (either has no data to 
process or it is back pressured) per second.</td>
+      <td>The time (in milliseconds) this task is idle (has no data to 
process) per second. Idle time excludes back pressured time, so if the task is 
back pressured it is not idle.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
+      <td>backPressuredTimeMsPerSecond</td>
+      <td>The time (in milliseconds) this task is back pressured per 
second.</td>
       <td>Meter</td>
     </tr>
     <tr>
diff --git a/docs/ops/metrics.zh.md b/docs/ops/metrics.zh.md
index eac4b55..19fcba3 100644
--- a/docs/ops/metrics.zh.md
+++ b/docs/ops/metrics.zh.md
@@ -1250,7 +1250,12 @@ Certain RocksDB native metrics are available but 
disabled by default, you can fi
     </tr>
     <tr>
       <td>idleTimeMsPerSecond</td>
-      <td>The time (in milliseconds) this task is idle (either has no data to 
process or it is back pressured) per second.</td>
+      <td>The time (in milliseconds) this task is idle (has no data to 
process) per second. Idle time excludes back pressured time, so if the task is 
back pressured it is not idle.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
+      <td>backPressuredTimeMsPerSecond</td>
+      <td>The time (in milliseconds) this task is back pressured per 
second.</td>
       <td>Meter</td>
     </tr>
     <tr>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index 5a36831..c5e1138 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -64,7 +64,7 @@ public abstract class BufferWritingResultPartition extends 
ResultPartition {
     /** For broadcast mode, a single BufferBuilder is shared by all 
subpartitions. */
     private BufferBuilder broadcastBufferBuilder;
 
-    private Meter idleTimeMsPerSecond = new MeterView(new SimpleCounter());
+    private Meter backPressuredTimeMsPerSecond = new MeterView(new 
SimpleCounter());
 
     public BufferWritingResultPartition(
             String owningTaskName,
@@ -193,7 +193,7 @@ public abstract class BufferWritingResultPartition extends 
ResultPartition {
     @Override
     public void setMetricGroup(TaskIOMetricGroup metrics) {
         super.setMetricGroup(metrics);
-        idleTimeMsPerSecond = metrics.getIdleTimeMsPerSecond();
+        backPressuredTimeMsPerSecond = metrics.getBackPressuredTimePerSecond();
     }
 
     @Override
@@ -335,10 +335,11 @@ public abstract class BufferWritingResultPartition 
extends ResultPartition {
             return bufferBuilder;
         }
 
-        final long start = System.currentTimeMillis();
         try {
+            long start = System.currentTimeMillis();
             bufferBuilder = 
bufferPool.requestBufferBuilderBlocking(targetSubpartition);
-            idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
+            long backPressuredTime = System.currentTimeMillis() - start;
+            backPressuredTimeMsPerSecond.markEvent(backPressuredTime);
             return bufferBuilder;
         } catch (InterruptedException e) {
             throw new IOException("Interrupted while waiting for buffer");
@@ -377,8 +378,8 @@ public abstract class BufferWritingResultPartition extends 
ResultPartition {
     }
 
     @VisibleForTesting
-    public Meter getIdleTimeMsPerSecond() {
-        return idleTimeMsPerSecond;
+    public Meter getBackPressuredTimeMsPerSecond() {
+        return backPressuredTimeMsPerSecond;
     }
 
     @VisibleForTesting
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index ccb442b..6a58de7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -67,4 +67,5 @@ public class MetricNames {
     }
 
     public static final String TASK_IDLE_TIME = "idleTimeMs" + SUFFIX_RATE;
+    public static final String TASK_BACK_PRESSURED_TIME = 
"backPressuredTimeMs" + SUFFIX_RATE;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index 1a449f9..9be2180 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -46,6 +46,7 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
     private final Meter numRecordsOutRate;
     private final Meter numBuffersOutRate;
     private final Meter idleTimePerSecond;
+    private final Meter backPressuredTimePerSecond;
 
     public TaskIOMetricGroup(TaskMetricGroup parent) {
         super(parent);
@@ -68,6 +69,8 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
 
         this.idleTimePerSecond =
                 meter(MetricNames.TASK_IDLE_TIME, new MeterView(new 
SimpleCounter()));
+        this.backPressuredTimePerSecond =
+                meter(MetricNames.TASK_BACK_PRESSURED_TIME, new MeterView(new 
SimpleCounter()));
     }
 
     public IOMetrics createSnapshot() {
@@ -102,6 +105,10 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
         return idleTimePerSecond;
     }
 
+    public Meter getBackPressuredTimePerSecond() {
+        return backPressuredTimePerSecond;
+    }
+
     // 
============================================================================================
     // Metric Reuse
     // 
============================================================================================
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 8d01a66..059c815 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -493,7 +493,7 @@ public class ResultPartitionTest {
     }
 
     @Test
-    public void testIdleTime() throws IOException, InterruptedException {
+    public void testIdleAndBackPressuredTime() throws IOException, 
InterruptedException {
         // setup
         int bufferSize = 1024;
         NetworkBufferPool globalPool = new NetworkBufferPool(10, bufferSize);
@@ -509,8 +509,8 @@ public class ResultPartitionTest {
         Buffer buffer = readView.getNextBuffer().buffer();
         assertNotNull(buffer);
 
-        // idle time is zero when there is buffer available.
-        assertEquals(0, resultPartition.getIdleTimeMsPerSecond().getCount());
+        // back-pressured time is zero when there is buffer available.
+        assertEquals(0, 
resultPartition.getBackPressuredTimeMsPerSecond().getCount());
 
         CountDownLatch syncLock = new CountDownLatch(1);
         final Thread requestThread =
@@ -536,7 +536,8 @@ public class ResultPartitionTest {
         requestThread.join();
 
         Assert.assertThat(
-                resultPartition.getIdleTimeMsPerSecond().getCount(), 
Matchers.greaterThan(0L));
+                resultPartition.getBackPressuredTimeMsPerSecond().getCount(),
+                Matchers.greaterThan(0L));
         assertNotNull(readView.getNextBuffer().buffer());
     }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 03fe18c..8d3ce70 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
@@ -47,6 +48,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
@@ -72,6 +74,7 @@ import 
org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction.Suspension;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
@@ -304,7 +307,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
         this.recordWriter = createRecordWriterDelegate(configuration, 
environment);
         this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
         this.mailboxProcessor = new MailboxProcessor(this::processInput, 
mailbox, actionExecutor);
-        this.mailboxProcessor.initMetric(environment.getMetricGroup());
         this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor();
         this.asyncExceptionHandler = new 
StreamTaskAsyncExceptionHandler(environment);
         this.asyncOperationsThreadPool =
@@ -400,25 +402,20 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
             controller.allActionsCompleted();
             return;
         }
-        CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
-        MailboxDefaultAction.Suspension suspendedDefaultAction = 
controller.suspendDefaultAction();
-        assertNoException(jointFuture.thenRun(suspendedDefaultAction::resume));
-    }
 
-    /**
-     * Considers three scenarios to combine input and output futures: 1. Both 
input and output are
-     * unavailable. 2. Only input is unavailable. 3. Only output is 
unavailable.
-     */
-    @VisibleForTesting
-    CompletableFuture<?> getInputOutputJointFuture(InputStatus status) {
-        if (status == InputStatus.NOTHING_AVAILABLE && 
!recordWriter.isAvailable()) {
-            return CompletableFuture.allOf(
-                    inputProcessor.getAvailableFuture(), 
recordWriter.getAvailableFuture());
-        } else if (status == InputStatus.NOTHING_AVAILABLE) {
-            return inputProcessor.getAvailableFuture();
+        TaskIOMetricGroup ioMetrics = 
getEnvironment().getMetricGroup().getIOMetricGroup();
+        final long startTime = System.currentTimeMillis();
+        Meter timer;
+        CompletableFuture<?> resumeFuture;
+        if (!recordWriter.isAvailable()) {
+            timer = ioMetrics.getBackPressuredTimePerSecond();
+            resumeFuture = recordWriter.getAvailableFuture();
         } else {
-            return recordWriter.getAvailableFuture();
+            timer = ioMetrics.getIdleTimeMsPerSecond();
+            resumeFuture = inputProcessor.getAvailableFuture();
         }
+        assertNoException(
+                resumeFuture.thenRun(new 
ResumeWrapper(controller.suspendDefaultAction(), timer, startTime)));
     }
 
     private void resetSynchronousSavepointId() {
@@ -1321,4 +1318,22 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
     protected long getAsyncCheckpointStartDelayNanos() {
         return latestAsyncCheckpointStartDelayNanos;
     }
+
+    private static class ResumeWrapper implements Runnable {
+        private final Suspension suspendedDefaultAction;
+        private final Meter timer;
+        private final long startTime;
+
+        public ResumeWrapper(Suspension suspendedDefaultAction, Meter timer, 
long startTime) {
+            this.suspendedDefaultAction = suspendedDefaultAction;
+            this.timer = timer;
+            this.startTime = startTime;
+        }
+
+        @Override
+        public void run() {
+            timer.markEvent(System.currentTimeMillis() - startTime);
+            suspendedDefaultAction.resume();
+        }
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index 9626c0a..699b9e9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -19,10 +19,6 @@ package org.apache.flink.streaming.runtime.tasks.mailbox;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.streaming.api.operators.MailboxExecutor;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
 import 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MailboxClosedException;
@@ -91,8 +87,6 @@ public class MailboxProcessor implements Closeable {
 
     private final StreamTaskActionExecutor actionExecutor;
 
-    private Meter idleTime = new MeterView(new SimpleCounter());
-
     @VisibleForTesting
     public MailboxProcessor() {
         this(MailboxDefaultAction.Controller::suspendDefaultAction);
@@ -131,10 +125,6 @@ public class MailboxProcessor implements Closeable {
         return new MailboxExecutorImpl(mailbox, priority, actionExecutor, 
this);
     }
 
-    public void initMetric(TaskMetricGroup metricGroup) {
-        idleTime = metricGroup.getIOMetricGroup().getIdleTimeMsPerSecond();
-    }
-
     /** Lifecycle method to close the mailbox for action submission. */
     public void prepareClose() {
         mailbox.quiesce();
@@ -315,9 +305,7 @@ public class MailboxProcessor implements Closeable {
         while (isDefaultActionUnavailable() && isMailboxLoopRunning()) {
             maybeMail = mailbox.tryTake(MIN_PRIORITY);
             if (!maybeMail.isPresent()) {
-                long start = System.currentTimeMillis();
                 maybeMail = Optional.of(mailbox.take(MIN_PRIORITY));
-                idleTime.markEvent(System.currentTimeMillis() - start);
             }
             maybeMail.get().run();
             processed = true;
@@ -355,11 +343,6 @@ public class MailboxProcessor implements Closeable {
     }
 
     @VisibleForTesting
-    public Meter getIdleTime() {
-        return idleTime;
-    }
-
-    @VisibleForTesting
     public boolean hasMail() {
         return mailbox.hasMail();
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 6706dd8..ffcf603 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -51,6 +51,7 @@ import 
org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -116,6 +117,7 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.SupplierWithException;
 
+import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -1241,6 +1243,7 @@ public class StreamTaskTest extends TestLogger {
 
     @Test
     public void testProcessWithUnAvailableOutput() throws Exception {
+        final long sleepTime = 42;
         try (final MockEnvironment environment = setupEnvironment(new 
boolean[] {true, false})) {
             final int numberOfProcessCalls = 10;
             final AvailabilityTestInputProcessor inputProcessor =
@@ -1253,6 +1256,7 @@ public class StreamTaskTest extends TestLogger {
 
             final RunnableWithException completeFutureTask =
                     () -> {
+                        Thread.sleep(sleepTime + 1);
                         assertEquals(1, inputProcessor.currentNumProcessCalls);
                         
assertTrue(task.mailboxProcessor.isDefaultActionUnavailable());
                         
environment.getWriter(1).getAvailableFuture().complete(null);
@@ -1266,11 +1270,39 @@ public class StreamTaskTest extends TestLogger {
                     },
                     "This task will submit another task to execute after 
processing input once.");
 
+            TaskIOMetricGroup ioMetricGroup =
+                    task.getEnvironment().getMetricGroup().getIOMetricGroup();
             task.invoke();
+            assertThat(
+                    ioMetricGroup.getBackPressuredTimePerSecond().getCount(),
+                    Matchers.greaterThanOrEqualTo(sleepTime));
+            assertThat(ioMetricGroup.getIdleTimeMsPerSecond().getCount(), 
is(0L));
             assertEquals(numberOfProcessCalls, 
inputProcessor.currentNumProcessCalls);
         }
     }
 
+    @Test
+    public void testProcessWithUnAvailableInput() throws Exception {
+        final long unAvailableTime = 42;
+        try (final MockEnvironment environment = setupEnvironment(new 
boolean[] {true, true})) {
+            final UnAvailableTestInputProcessor inputProcessor =
+                    new UnAvailableTestInputProcessor(unAvailableTime);
+            final StreamTask task =
+                    new MockStreamTaskBuilder(environment)
+                            .setStreamInputProcessor(inputProcessor)
+                            .build();
+
+            TaskIOMetricGroup ioMetricGroup =
+                    task.getEnvironment().getMetricGroup().getIOMetricGroup();
+            task.invoke();
+
+            assertThat(
+                    ioMetricGroup.getIdleTimeMsPerSecond().getCount(),
+                    Matchers.greaterThanOrEqualTo(unAvailableTime));
+            
assertThat(ioMetricGroup.getBackPressuredTimePerSecond().getCount(), is(0L));
+        }
+    }
+
     private MockEnvironment setupEnvironment(boolean[] outputAvailabilities) {
         final Configuration configuration = new Configuration();
         new MockStreamConfig(configuration, outputAvailabilities.length);
@@ -1433,6 +1465,76 @@ public class StreamTaskTest extends TestLogger {
         }
     }
 
+    /**
+     * A stream input processor implementation with input unavailable for a 
specified amount of
+     * time, after which processor is closing.
+     */
+    private static class UnAvailableTestInputProcessor implements 
StreamInputProcessor {
+        private final AvailabilityHelper availabilityProvider = new 
AvailabilityHelper();
+        private final Thread timerThread;
+
+        private boolean timerTriggered;
+
+        private volatile Exception asyncException;
+
+        public UnAvailableTestInputProcessor(long unAvailableTime) {
+            timerThread =
+                    new Thread() {
+                        @Override
+                        public void run() {
+                            try {
+                                Thread.sleep(unAvailableTime);
+                                availabilityProvider
+                                        .getUnavailableToResetAvailable()
+                                        .complete(null);
+                            } catch (Exception e) {
+                                asyncException = e;
+                            }
+                        }
+                    };
+        }
+
+        @Override
+        public InputStatus processInput() {
+            maybeTriggerTimer();
+            return availabilityProvider.isAvailable()
+                    ? InputStatus.END_OF_INPUT
+                    : InputStatus.NOTHING_AVAILABLE;
+        }
+
+        @Override
+        public CompletableFuture<Void> prepareSnapshot(
+                ChannelStateWriter channelStateWriter, final long 
checkpointId) {
+            return FutureUtils.completedVoidFuture();
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (asyncException != null) {
+                throw new IOException(asyncException);
+            }
+            try {
+                timerThread.join();
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            }
+        }
+
+        @Override
+        public CompletableFuture<?> getAvailableFuture() {
+            maybeTriggerTimer();
+            return availabilityProvider.getAvailableFuture();
+        }
+
+        private void maybeTriggerTimer() {
+            if (timerTriggered) {
+                return;
+            }
+            timerTriggered = true;
+            timerThread.start();
+        }
+    }
+
     private static class BlockingCloseStreamOperator extends 
AbstractStreamOperator<Void> {
         private static final long serialVersionUID = -9042150529568008847L;
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
index e787338..94b5f5d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
@@ -23,13 +23,11 @@ import 
org.apache.flink.streaming.api.operators.MailboxExecutor;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.function.RunnableWithException;
 
-import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -254,71 +252,6 @@ public class TaskMailboxProcessorTest {
         mailboxProcessor.allActionsCompleted();
     }
 
-    @Test
-    public void testNoIdleTimeWhenBusy() throws InterruptedException {
-        final AtomicReference<MailboxDefaultAction.Suspension> 
suspendedActionRef =
-                new AtomicReference<>();
-        final int totalSwitches = 10;
-
-        AtomicInteger count = new AtomicInteger();
-        MailboxThread mailboxThread =
-                new MailboxThread() {
-                    @Override
-                    public void runDefaultAction(Controller controller) {
-                        int currentCount = count.incrementAndGet();
-                        if (currentCount == totalSwitches) {
-                            controller.allActionsCompleted();
-                        }
-                    }
-                };
-        mailboxThread.start();
-        final MailboxProcessor mailboxProcessor = 
mailboxThread.getMailboxProcessor();
-
-        mailboxThread.signalStart();
-        mailboxThread.join();
-
-        Assert.assertEquals(0, mailboxProcessor.getIdleTime().getCount());
-        Assert.assertEquals(totalSwitches, count.get());
-    }
-
-    @Test
-    public void testIdleTime() throws InterruptedException {
-        final AtomicReference<MailboxDefaultAction.Suspension> 
suspendedActionRef =
-                new AtomicReference<>();
-        final int totalSwitches = 2;
-
-        CountDownLatch syncLock = new CountDownLatch(1);
-        MailboxThread mailboxThread =
-                new MailboxThread() {
-                    int count = 0;
-
-                    @Override
-                    public void runDefaultAction(Controller controller) {
-                        // If this is violated, it means that the default 
action was invoked while
-                        // we assumed suspension
-                        Assert.assertTrue(
-                                suspendedActionRef.compareAndSet(
-                                        null, 
controller.suspendDefaultAction()));
-                        ++count;
-                        if (count == totalSwitches) {
-                            controller.allActionsCompleted();
-                        }
-                        syncLock.countDown();
-                    }
-                };
-        mailboxThread.start();
-        final MailboxProcessor mailboxProcessor = 
mailboxThread.getMailboxProcessor();
-        mailboxThread.signalStart();
-
-        syncLock.await();
-        Thread.sleep(10);
-        mailboxProcessor
-                .getMailboxExecutor(DEFAULT_PRIORITY)
-                .execute(suspendedActionRef.get()::resume, "resume");
-        mailboxThread.join();
-        Assert.assertThat(mailboxProcessor.getIdleTime().getCount(), 
Matchers.greaterThan(0L));
-    }
-
     private static MailboxProcessor start(MailboxThread mailboxThread) {
         mailboxThread.start();
         final MailboxProcessor mailboxProcessor = 
mailboxThread.getMailboxProcessor();

Reply via email to