This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 39866ce  [FLINK-24800][runtime] Changed the assert condition for 
checking buffer timeout disabling test
39866ce is described below

commit 39866ce3418974c9f9b746a3fb8b8e5c30a5f5db
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Wed Nov 10 17:58:54 2021 +0100

    [FLINK-24800][runtime] Changed the assert condition for checking buffer 
timeout disabling test
---
 .../streaming/runtime/BufferTimeoutITCase.java     | 24 ++++++++++++++--------
 1 file changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java
index 8597820..992acbb 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BufferTimeoutITCase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.test.streaming.runtime;
 
-import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -33,8 +33,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /** Tests for {@link StreamExecutionEnvironment#setBufferTimeout(long)}. */
 public class BufferTimeoutITCase extends AbstractTestBase {
@@ -42,9 +41,9 @@ public class BufferTimeoutITCase extends AbstractTestBase {
     @Rule public final SharedObjects sharedObjects = SharedObjects.create();
 
     /**
-     * The test verifies that it is possible to disable buffer flushing. It 
emits a single record,
-     * which should not fill an entire buffer, thus it should not never reach 
the sink. We check the
-     * sink has not seen any records after 2 times the default buffer timeout.
+     * The test verifies that it is possible to disable explicit buffer 
flushing. It checks that
+     * OutputFlasher thread would not be started when the task is running. But 
this doesn't
+     * guarantee that the unfinished buffers can not be flushed by another 
events.
      */
     @Test
     public void testDisablingBufferTimeout() throws Exception {
@@ -71,7 +70,7 @@ public class BufferTimeoutITCase extends AbstractTestBase {
                         new SinkFunction<Integer>() {
 
                             @Override
-                            public void invoke(Integer value, Context context) 
throws Exception {
+                            public void invoke(Integer value, Context context) 
{
                                 results.get().add(value);
                             }
                         })
@@ -81,7 +80,14 @@ public class BufferTimeoutITCase extends AbstractTestBase {
         CommonTestUtils.waitForAllTaskRunning(
                 miniClusterResource.getMiniCluster(), jobClient.getJobID(), 
false);
 
-        Thread.sleep(2 * 
ExecutionOptions.BUFFER_TIMEOUT.defaultValue().toMillis());
-        assertThat(results.get().size(), equalTo(0));
+        assertTrue(
+                RecordWriter.DEFAULT_OUTPUT_FLUSH_THREAD_NAME + " thread is 
unexpectedly running",
+                Thread.getAllStackTraces().keySet().stream()
+                        .noneMatch(
+                                thread ->
+                                        thread.getName()
+                                                .startsWith(
+                                                        RecordWriter
+                                                                
.DEFAULT_OUTPUT_FLUSH_THREAD_NAME)));
     }
 }

Reply via email to