This is an automated email from the ASF dual-hosted git repository.
yuanmei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ff416c6 [FLINK-23680][streaming] Wait until all expected threads
would be triggered in testOpenCloseAndTimestamps (#16797)
ff416c6 is described below
commit ff416c644f6f49d14838b0bc664a1827707f5faa
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Tue Aug 17 06:00:28 2021 +0200
[FLINK-23680][streaming] Wait until all expected threads would be triggered
in testOpenCloseAndTimestamps (#16797)
---
.../runtime/operators/StreamTaskTimerTest.java | 21 ++++++++++++----
.../runtime/tasks/StreamTaskTestHarness.java | 29 ++++++++++++++++------
2 files changed, 37 insertions(+), 13 deletions(-)
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index e126a26..cab02b2 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap;
@@ -33,10 +34,13 @@ import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.instanceOf;
@@ -50,6 +54,7 @@ public class StreamTaskTimerTest extends TestLogger {
private StreamTaskTestHarness<?> testHarness;
private ProcessingTimeService timeService;
+ @Rule public final Timeout timeoutPerTest = Timeout.seconds(20);
@Before
public void setup() throws Exception {
@@ -70,11 +75,12 @@ public class StreamTaskTimerTest extends TestLogger {
}
@Test
- public void testOpenCloseAndTimestamps() {
- // first one spawns thread
- timeService.registerTimer(System.currentTimeMillis(), timestamp -> {});
-
- assertEquals(2, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
+ public void testOpenCloseAndTimestamps() throws InterruptedException {
+ // Wait for StreamTask#invoke spawn the timeService threads for the
throughput calculation.
+ while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() != 1) {
+ Thread.sleep(1);
+ }
+ // The timeout would happen if spawning the thread failed.
}
@Test
@@ -179,6 +185,11 @@ public class StreamTaskTimerTest extends TestLogger {
BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();
+ // Making it impossible to execute the throughput calculation even
once during the test.
+ testHarness
+ .getTaskManagerRuntimeInfo()
+ .getConfiguration()
+ .set(TaskManagerOptions.BUFFER_DEBLOAT_PERIOD,
Duration.ofMinutes(10));
StreamConfig streamConfig = testHarness.getStreamConfig();
streamConfig.setChainIndex(0);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index d80d5cd..75601af 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -47,6 +47,8 @@ import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
@@ -113,6 +115,7 @@ public class StreamTaskTestHarness<OUT> {
public Configuration jobConfig;
public Configuration taskConfig;
protected StreamConfig streamConfig;
+ protected TaskManagerRuntimeInfo taskManagerRuntimeInfo = new
TestingTaskManagerRuntimeInfo();
protected TestTaskStateManager taskStateManager;
@@ -184,6 +187,10 @@ public class StreamTaskTestHarness<OUT> {
return taskThread.task.getTimerService();
}
+ public TaskManagerRuntimeInfo getTaskManagerRuntimeInfo() {
+ return taskManagerRuntimeInfo;
+ }
+
@SuppressWarnings("unchecked")
public <OP extends StreamOperator<OUT>> OP getHeadOperator() {
return (OP) taskThread.task.getMainOperator();
@@ -251,14 +258,20 @@ public class StreamTaskTestHarness<OUT> {
}
public StreamMockEnvironment createEnvironment() {
- return new StreamMockEnvironment(
- jobConfig,
- taskConfig,
- executionConfig,
- memorySize,
- new MockInputSplitProvider(),
- bufferSize,
- taskStateManager);
+ StreamMockEnvironment streamMockEnvironment =
+ new StreamMockEnvironment(
+ jobConfig,
+ taskConfig,
+ executionConfig,
+ memorySize,
+ new MockInputSplitProvider(),
+ bufferSize,
+ taskStateManager);
+ if (taskManagerRuntimeInfo != null) {
+ streamMockEnvironment.setTaskManagerInfo(taskManagerRuntimeInfo);
+ }
+
+ return streamMockEnvironment;
}
/**