[hotfix][network] various minor improvements

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d30df346
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d30df346
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d30df346

Branch: refs/heads/master
Commit: d30df346b43180a5a5e90b84b0f19b7f379985e2
Parents: 81259ad
Author: Nico Kruber <[email protected]>
Authored: Tue Feb 20 18:07:02 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Feb 27 09:07:14 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/io/network/partition/PipelinedSubpartition.java  | 2 ++
 .../api/writer/AbstractCollectingResultPartitionWriter.java        | 2 +-
 .../runtime/io/network/partition/SpillableSubpartitionTest.java    | 2 --
 3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d30df346/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index dcaa360..a9c6e57 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -265,6 +265,8 @@ class PipelinedSubpartition extends ResultSubpartition {
        }
 
        private int getNumberOfFinishedBuffers() {
+               assert Thread.holdsLock(buffers);
+
                if (buffers.size() == 1 && buffers.peekLast().isFinished()) {
                        return 1;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/d30df346/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index 0324375..981ca56 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -76,7 +76,7 @@ public abstract class AbstractCollectingResultPartitionWriter 
implements ResultP
                        Buffer buffer = bufferConsumer.build();
                        try {
                                deserializeBuffer(buffer);
-                               if (!bufferConsumers.peek().isFinished()) {
+                               if (!bufferConsumer.isFinished()) {
                                        break;
                                }
                                bufferConsumers.pop().close();

http://git-wip-us.apache.org/repos/asf/flink/blob/d30df346/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 43bcd31..a6be748 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -227,7 +227,6 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertTrue(read.buffer().isBuffer());
                assertEquals(2, partition.getBuffersInBacklog());
                assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertNotSame(bufferConsumer, read);
                assertFalse(read.buffer().isRecycled());
                read.buffer().recycleBuffer();
                assertTrue(read.buffer().isRecycled());
@@ -239,7 +238,6 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertTrue(read.buffer().isBuffer());
                assertEquals(1, partition.getBuffersInBacklog());
                assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-               assertNotSame(bufferConsumer, read);
                assertFalse(read.buffer().isRecycled());
                read.buffer().recycleBuffer();
                assertTrue(read.buffer().isRecycled());

Reply via email to