This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f0dc60eb2eb302f11973282cca3c7d021048a692 Author: Stephan Ewen <[email protected]> AuthorDate: Sat Jun 8 16:10:40 2019 +0200 [hotfix] [network] Rename BoundedBlockingSubpartitionReader 'memory' parameter --- .../network/partition/BoundedBlockingSubpartition.java | 18 +++++++++--------- .../partition/BoundedBlockingSubpartitionReader.java | 14 +++++++------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java index 76c7a2d..756b0af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java @@ -72,7 +72,7 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { private BufferConsumer currentBuffer; /** The memory that we store the data in, via a memory mapped file. */ - private final MemoryMappedBuffers memory; + private final MemoryMappedBuffers data; /** All created and not yet released readers. */ @GuardedBy("lock") @@ -108,11 +108,11 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { BoundedBlockingSubpartition( int index, ResultPartition parent, - MemoryMappedBuffers memory) throws IOException { + MemoryMappedBuffers data) throws IOException { super(index, parent); - this.memory = checkNotNull(memory); + this.data = checkNotNull(data); this.readers = new HashSet<>(); } @@ -166,7 +166,7 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { try { final Buffer buffer = bufferConsumer.build(); try { - memory.writeBuffer(buffer); + data.writeBuffer(buffer); numBuffersAndEventsWritten++; if (buffer.isBuffer()) { @@ -190,7 +190,7 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { isFinished = true; flushCurrentBuffer(); writeAndCloseBufferConsumer(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE)); - memory.finishWrite(); + data.finishWrite(); } @Override @@ -215,9 +215,9 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { availability.notifyDataAvailable(); - final MemoryMappedBuffers.BufferSlicer memoryReader = memory.getFullBuffers(); + final MemoryMappedBuffers.BufferSlicer dataReader = data.getFullBuffers(); final BoundedBlockingSubpartitionReader reader = new BoundedBlockingSubpartitionReader( - this, memoryReader, numDataBuffersWritten); + this, dataReader, numDataBuffersWritten); readers.add(reader); return reader; } @@ -240,7 +240,7 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { // To avoid segmentation faults, we need to wait until all readers have been released. if (readers.isEmpty()) { - memory.close(); + data.close(); } } @@ -265,7 +265,7 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { @Override protected long getTotalNumberOfBytes() { - return memory.getSize(); + return data.getSize(); } int getBuffersInBacklog() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java index d6c6834..07a999d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java @@ -44,7 +44,7 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView /** The reader/decoder to the memory mapped region with the data we currently read from. * Null once the reader empty or disposed.*/ @Nullable - private BufferSlicer memory; + private BufferSlicer data; /** The remaining number of data buffers (not events) in the result. */ private int dataBufferBacklog; @@ -57,16 +57,16 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView */ BoundedBlockingSubpartitionReader( BoundedBlockingSubpartition parent, - BufferSlicer memory, + BufferSlicer data, int numDataBuffers) { checkArgument(numDataBuffers >= 0); this.parent = checkNotNull(parent); - this.memory = checkNotNull(memory); + this.data = checkNotNull(data); this.dataBufferBacklog = numDataBuffers; - this.nextBuffer = memory.sliceNextBuffer(); + this.nextBuffer = data.sliceNextBuffer(); } @Nullable @@ -83,8 +83,8 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView dataBufferBacklog--; } - assert memory != null; - nextBuffer = memory.sliceNextBuffer(); + assert data != null; + nextBuffer = data.sliceNextBuffer(); return BufferAndBacklog.fromBufferAndLookahead(current, nextBuffer, dataBufferBacklog); } @@ -106,7 +106,7 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView // nulling these fields means thet read method and will fail fast nextBuffer = null; - memory = null; + data = null; // Notify the parent that this one is released. This allows the parent to // eventually release all resources (when all readers are done and the
