This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9d2b74bf502cac3274cd6f2dc72db56c056ff92b Author: Nico Kruber <[email protected]> AuthorDate: Thu Sep 13 12:25:24 2018 +0200 [hotfix][network] adapt InputGateConcurrentTest to really follow our guarantees - producers should flush after writing to make sure all data has been sent - we can only check bufferConsumer.isFinished() after building a Buffer - producer/consumer threads should be named --- .../network/partition/InputGateConcurrentTest.java | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java index 5f5728d..5c643af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java @@ -23,6 +23,7 @@ import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.TaskEventDispatcher; +import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; @@ -198,6 +199,8 @@ public class InputGateConcurrentTest { private abstract static class Source { abstract void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception; + + abstract void flush(); } private static class PipelinedSubpartitionSource extends Source { @@ -212,6 +215,11 @@ public class InputGateConcurrentTest { void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception { partition.add(bufferConsumer); } + + @Override + void flush() { + partition.flush(); + } } private static class RemoteChannelSource extends Source { @@ -225,14 +233,19 @@ public class InputGateConcurrentTest { @Override void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception { - checkState(bufferConsumer.isFinished(), "Handling of non finished buffers is not yet implemented"); try { - channel.onBuffer(bufferConsumer.build(), seq++, -1); + Buffer buffer = bufferConsumer.build(); + checkState(bufferConsumer.isFinished(), "Handling of non finished buffers is not yet implemented"); + channel.onBuffer(buffer, seq++, -1); } finally { bufferConsumer.close(); } } + + @Override + void flush() { + } } // ------------------------------------------------------------------------ @@ -248,6 +261,7 @@ public class InputGateConcurrentTest { private final int yieldAfter; ProducerThread(Source[] sources, int numTotal, int maxChunk, int yieldAfter) { + super("producer"); this.sources = sources; this.numTotal = numTotal; this.maxChunk = maxChunk; @@ -276,7 +290,10 @@ public class InputGateConcurrentTest { //noinspection CallToThreadYield Thread.yield(); } + } + for (Source source : sources) { + source.flush(); } } } @@ -287,6 +304,7 @@ public class InputGateConcurrentTest { private final int numBuffers; ConsumerThread(SingleInputGate gate, int numBuffers) { + super("consumer"); this.gate = gate; this.numBuffers = numBuffers; }
