This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9d2b74bf502cac3274cd6f2dc72db56c056ff92b
Author: Nico Kruber <[email protected]>
AuthorDate: Thu Sep 13 12:25:24 2018 +0200

    [hotfix][network] adapt InputGateConcurrentTest to really follow our 
guarantees
    
    - producers should flush after writing to make sure all data has been sent
    - we can only check bufferConsumer.isFinished() after building a Buffer
    - producer/consumer threads should be named
---
 .../network/partition/InputGateConcurrentTest.java | 22 ++++++++++++++++++++--
 1 file changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 5f5728d..5c643af 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
@@ -198,6 +199,8 @@ public class InputGateConcurrentTest {
        private abstract static class Source {
 
                abstract void addBufferConsumer(BufferConsumer bufferConsumer) 
throws Exception;
+
+               abstract void flush();
        }
 
        private static class PipelinedSubpartitionSource extends Source {
@@ -212,6 +215,11 @@ public class InputGateConcurrentTest {
                void addBufferConsumer(BufferConsumer bufferConsumer) throws 
Exception {
                        partition.add(bufferConsumer);
                }
+
+               @Override
+               void flush() {
+                       partition.flush();
+               }
        }
 
        private static class RemoteChannelSource extends Source {
@@ -225,14 +233,19 @@ public class InputGateConcurrentTest {
 
                @Override
                void addBufferConsumer(BufferConsumer bufferConsumer) throws 
Exception {
-                       checkState(bufferConsumer.isFinished(), "Handling of 
non finished buffers is not yet implemented");
                        try {
-                               channel.onBuffer(bufferConsumer.build(), seq++, 
-1);
+                               Buffer buffer = bufferConsumer.build();
+                               checkState(bufferConsumer.isFinished(), 
"Handling of non finished buffers is not yet implemented");
+                               channel.onBuffer(buffer, seq++, -1);
                        }
                        finally {
                                bufferConsumer.close();
                        }
                }
+
+               @Override
+               void flush() {
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -248,6 +261,7 @@ public class InputGateConcurrentTest {
                private final int yieldAfter;
 
                ProducerThread(Source[] sources, int numTotal, int maxChunk, 
int yieldAfter) {
+                       super("producer");
                        this.sources = sources;
                        this.numTotal = numTotal;
                        this.maxChunk = maxChunk;
@@ -276,7 +290,10 @@ public class InputGateConcurrentTest {
                                        //noinspection CallToThreadYield
                                        Thread.yield();
                                }
+                       }
 
+                       for (Source source : sources) {
+                               source.flush();
                        }
                }
        }
@@ -287,6 +304,7 @@ public class InputGateConcurrentTest {
                private final int numBuffers;
 
                ConsumerThread(SingleInputGate gate, int numBuffers) {
+                       super("consumer");
                        this.gate = gate;
                        this.numBuffers = numBuffers;
                }

Reply via email to