[FLINK-8948][runtime] Fix IllegalStateException when closing StreamTask This closes #5710.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6bbd123 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6bbd123 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6bbd123 Branch: refs/heads/master Commit: b6bbd123c951d1f6e1cda108514d8e749c5ed033 Parents: 463b922 Author: Piotr Nowojski <[email protected]> Authored: Fri Mar 16 15:56:07 2018 +0100 Committer: zentol <[email protected]> Committed: Tue Mar 20 10:14:26 2018 +0100 ---------------------------------------------------------------------- .../runtime/io/network/buffer/BufferBuilder.java | 5 +++-- .../buffer/BufferBuilderAndConsumerTest.java | 16 ++++++++++++---- 2 files changed, 15 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b6bbd123/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java index 63b60d2..305f184 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java @@ -99,10 +99,11 @@ public class BufferBuilder { * Mark this {@link BufferBuilder} and associated {@link BufferConsumer} as finished - no new data writes will be * allowed. * + * <p>This method should be idempotent to handle failures and task interruptions. Check FLINK-8948 for more details. + * * @return number of written bytes. */ public int finish() { - checkState(!isFinished()); positionMarker.markFinished(); commit(); return getWrittenBytes(); @@ -125,7 +126,7 @@ public class BufferBuilder { return memorySegment.size(); } - public int getWrittenBytes() { + private int getWrittenBytes() { return positionMarker.getCached(); } http://git-wip-us.apache.org/repos/asf/flink/blob/b6bbd123/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java index edf2bfe..b5d9da0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java @@ -202,22 +202,30 @@ public class BufferBuilderAndConsumerTest { for (int i = 0; i < writes; i++) { assertEquals(Integer.BYTES, bufferBuilder.appendAndCommit(toByteBuffer(42))); } + int expectedWrittenBytes = writes * Integer.BYTES; assertFalse(bufferBuilder.isFinished()); assertFalse(bufferConsumer.isFinished()); + assertEquals(0, bufferConsumer.getWrittenBytes()); bufferConsumer.build(); - assertFalse(bufferBuilder.isFinished()); assertFalse(bufferConsumer.isFinished()); + assertEquals(expectedWrittenBytes, bufferConsumer.getWrittenBytes()); - bufferBuilder.finish(); - + int actualWrittenBytes = bufferBuilder.finish(); + assertEquals(expectedWrittenBytes, actualWrittenBytes); assertTrue(bufferBuilder.isFinished()); assertFalse(bufferConsumer.isFinished()); + assertEquals(expectedWrittenBytes, bufferConsumer.getWrittenBytes()); - bufferConsumer.build(); + actualWrittenBytes = bufferBuilder.finish(); + assertEquals(expectedWrittenBytes, actualWrittenBytes); + assertTrue(bufferBuilder.isFinished()); + assertFalse(bufferConsumer.isFinished()); + assertEquals(expectedWrittenBytes, bufferConsumer.getWrittenBytes()); + assertEquals(0, bufferConsumer.build().getSize()); assertTrue(bufferConsumer.isFinished()); }
