This is an automated email from the ASF dual-hosted git repository.

yuanmei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ff416c6  [FLINK-23680][streaming] Wait until all expected threads 
would be triggered in testOpenCloseAndTimestamps (#16797)
ff416c6 is described below

commit ff416c644f6f49d14838b0bc664a1827707f5faa
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Tue Aug 17 06:00:28 2021 +0200

    [FLINK-23680][streaming] Wait until all expected threads would be triggered 
in testOpenCloseAndTimestamps (#16797)
---
 .../runtime/operators/StreamTaskTimerTest.java     | 21 ++++++++++++----
 .../runtime/tasks/StreamTaskTestHarness.java       | 29 ++++++++++++++++------
 2 files changed, 37 insertions(+), 13 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index e126a26..cab02b2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamMap;
@@ -33,10 +34,13 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.Matchers.instanceOf;
@@ -50,6 +54,7 @@ public class StreamTaskTimerTest extends TestLogger {
 
     private StreamTaskTestHarness<?> testHarness;
     private ProcessingTimeService timeService;
+    @Rule public final Timeout timeoutPerTest = Timeout.seconds(20);
 
     @Before
     public void setup() throws Exception {
@@ -70,11 +75,12 @@ public class StreamTaskTimerTest extends TestLogger {
     }
 
     @Test
-    public void testOpenCloseAndTimestamps() {
-        // first one spawns thread
-        timeService.registerTimer(System.currentTimeMillis(), timestamp -> {});
-
-        assertEquals(2, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
+    public void testOpenCloseAndTimestamps() throws InterruptedException {
+        // Wait for StreamTask#invoke spawn the timeService threads for the 
throughput calculation.
+        while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() != 1) {
+            Thread.sleep(1);
+        }
+        // The timeout would happen if spawning the thread failed.
     }
 
     @Test
@@ -179,6 +185,11 @@ public class StreamTaskTimerTest extends TestLogger {
                         BasicTypeInfo.STRING_TYPE_INFO);
 
         testHarness.setupOutputForSingletonOperatorChain();
+        // Making it impossible to execute the throughput calculation even 
once during the test.
+        testHarness
+                .getTaskManagerRuntimeInfo()
+                .getConfiguration()
+                .set(TaskManagerOptions.BUFFER_DEBLOAT_PERIOD, 
Duration.ofMinutes(10));
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
         streamConfig.setChainIndex(0);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index d80d5cd..75601af 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -47,6 +47,8 @@ import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
 import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
@@ -113,6 +115,7 @@ public class StreamTaskTestHarness<OUT> {
     public Configuration jobConfig;
     public Configuration taskConfig;
     protected StreamConfig streamConfig;
+    protected TaskManagerRuntimeInfo taskManagerRuntimeInfo = new 
TestingTaskManagerRuntimeInfo();
 
     protected TestTaskStateManager taskStateManager;
 
@@ -184,6 +187,10 @@ public class StreamTaskTestHarness<OUT> {
         return taskThread.task.getTimerService();
     }
 
+    public TaskManagerRuntimeInfo getTaskManagerRuntimeInfo() {
+        return taskManagerRuntimeInfo;
+    }
+
     @SuppressWarnings("unchecked")
     public <OP extends StreamOperator<OUT>> OP getHeadOperator() {
         return (OP) taskThread.task.getMainOperator();
@@ -251,14 +258,20 @@ public class StreamTaskTestHarness<OUT> {
     }
 
     public StreamMockEnvironment createEnvironment() {
-        return new StreamMockEnvironment(
-                jobConfig,
-                taskConfig,
-                executionConfig,
-                memorySize,
-                new MockInputSplitProvider(),
-                bufferSize,
-                taskStateManager);
+        StreamMockEnvironment streamMockEnvironment =
+                new StreamMockEnvironment(
+                        jobConfig,
+                        taskConfig,
+                        executionConfig,
+                        memorySize,
+                        new MockInputSplitProvider(),
+                        bufferSize,
+                        taskStateManager);
+        if (taskManagerRuntimeInfo != null) {
+            streamMockEnvironment.setTaskManagerInfo(taskManagerRuntimeInfo);
+        }
+
+        return streamMockEnvironment;
     }
 
     /**

Reply via email to