This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 39866ce [FLINK-24800][runtime] Changed the assert condition for
checking buffer timeout disabling test
39866ce is described below
commit 39866ce3418974c9f9b746a3fb8b8e5c30a5f5db
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Wed Nov 10 17:58:54 2021 +0100
[FLINK-24800][runtime] Changed the assert condition for checking buffer
timeout disabling test
---
.../streaming/runtime/BufferTimeoutITCase.java | 24 ++++++++++++++--------
1 file changed, 15 insertions(+), 9 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java
index 8597820..992acbb 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java
@@ -18,8 +18,8 @@
package org.apache.flink.test.streaming.runtime;
-import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -33,8 +33,7 @@ import org.junit.Test;
import java.util.ArrayList;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
/** Tests for {@link StreamExecutionEnvironment#setBufferTimeout(long)}. */
public class BufferTimeoutITCase extends AbstractTestBase {
@@ -42,9 +41,9 @@ public class BufferTimeoutITCase extends AbstractTestBase {
@Rule public final SharedObjects sharedObjects = SharedObjects.create();
/**
- * The test verifies that it is possible to disable buffer flushing. It
emits a single record,
- * which should not fill an entire buffer, thus it should not never reach
the sink. We check the
- * sink has not seen any records after 2 times the default buffer timeout.
+ * The test verifies that it is possible to disable explicit buffer
flushing. It checks that
+ * OutputFlasher thread would not be started when the task is running. But
this doesn't
+ * guarantee that the unfinished buffers can not be flushed by another
events.
*/
@Test
public void testDisablingBufferTimeout() throws Exception {
@@ -71,7 +70,7 @@ public class BufferTimeoutITCase extends AbstractTestBase {
new SinkFunction<Integer>() {
@Override
- public void invoke(Integer value, Context context)
throws Exception {
+ public void invoke(Integer value, Context context)
{
results.get().add(value);
}
})
@@ -81,7 +80,14 @@ public class BufferTimeoutITCase extends AbstractTestBase {
CommonTestUtils.waitForAllTaskRunning(
miniClusterResource.getMiniCluster(), jobClient.getJobID(),
false);
- Thread.sleep(2 *
ExecutionOptions.BUFFER_TIMEOUT.defaultValue().toMillis());
- assertThat(results.get().size(), equalTo(0));
+ assertTrue(
+ RecordWriter.DEFAULT_OUTPUT_FLUSH_THREAD_NAME + " thread is
unexpectedly running",
+ Thread.getAllStackTraces().keySet().stream()
+ .noneMatch(
+ thread ->
+ thread.getName()
+ .startsWith(
+ RecordWriter
+
.DEFAULT_OUTPUT_FLUSH_THREAD_NAME)));
}
}