[FLINK-8948][runtime] Fix IllegalStateException when closing StreamTask

This closes #5710.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6bbd123
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6bbd123
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6bbd123

Branch: refs/heads/master
Commit: b6bbd123c951d1f6e1cda108514d8e749c5ed033
Parents: 463b922
Author: Piotr Nowojski <[email protected]>
Authored: Fri Mar 16 15:56:07 2018 +0100
Committer: zentol <[email protected]>
Committed: Tue Mar 20 10:14:26 2018 +0100

----------------------------------------------------------------------
 .../runtime/io/network/buffer/BufferBuilder.java    |  5 +++--
 .../buffer/BufferBuilderAndConsumerTest.java        | 16 ++++++++++++----
 2 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b6bbd123/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 63b60d2..305f184 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -99,10 +99,11 @@ public class BufferBuilder {
         * Mark this {@link BufferBuilder} and associated {@link 
BufferConsumer} as finished - no new data writes will be
         * allowed.
         *
+        * <p>This method should be idempotent to handle failures and task 
interruptions. Check FLINK-8948 for more details.
+        *
         * @return number of written bytes.
         */
        public int finish() {
-               checkState(!isFinished());
                positionMarker.markFinished();
                commit();
                return getWrittenBytes();
@@ -125,7 +126,7 @@ public class BufferBuilder {
                return memorySegment.size();
        }
 
-       public int getWrittenBytes() {
+       private int getWrittenBytes() {
                return positionMarker.getCached();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b6bbd123/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
index edf2bfe..b5d9da0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
@@ -202,22 +202,30 @@ public class BufferBuilderAndConsumerTest {
                for (int i = 0; i < writes; i++) {
                        assertEquals(Integer.BYTES, 
bufferBuilder.appendAndCommit(toByteBuffer(42)));
                }
+               int expectedWrittenBytes = writes * Integer.BYTES;
 
                assertFalse(bufferBuilder.isFinished());
                assertFalse(bufferConsumer.isFinished());
+               assertEquals(0, bufferConsumer.getWrittenBytes());
 
                bufferConsumer.build();
-
                assertFalse(bufferBuilder.isFinished());
                assertFalse(bufferConsumer.isFinished());
+               assertEquals(expectedWrittenBytes, 
bufferConsumer.getWrittenBytes());
 
-               bufferBuilder.finish();
-
+               int actualWrittenBytes = bufferBuilder.finish();
+               assertEquals(expectedWrittenBytes, actualWrittenBytes);
                assertTrue(bufferBuilder.isFinished());
                assertFalse(bufferConsumer.isFinished());
+               assertEquals(expectedWrittenBytes, 
bufferConsumer.getWrittenBytes());
 
-               bufferConsumer.build();
+               actualWrittenBytes = bufferBuilder.finish();
+               assertEquals(expectedWrittenBytes, actualWrittenBytes);
+               assertTrue(bufferBuilder.isFinished());
+               assertFalse(bufferConsumer.isFinished());
+               assertEquals(expectedWrittenBytes, 
bufferConsumer.getWrittenBytes());
 
+               assertEquals(0, bufferConsumer.build().getSize());
                assertTrue(bufferConsumer.isFinished());
        }
 

Reply via email to