This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5fd01eacd2e49673b0ff1532d0798844639569de
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed May 6 17:55:48 2020 +0200

    [FLINK-17547][task][hotfix] Extract SpanningWrapper
    from SpillingAdaptiveSpanningRecordDeserializer (static inner class).
    As it is, no logical changes.
---
 .../network/api/serialization/SpanningWrapper.java | 297 +++++++++++++++++++++
 ...SpillingAdaptiveSpanningRecordDeserializer.java | 278 -------------------
 2 files changed, 297 insertions(+), 278 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
new file mode 100644
index 0000000..e59363f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.serialization;
+
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.StringUtils;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.Random;
+
+final class SpanningWrapper {
+
+       private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 
MiBytes
+
+       private final byte[] initialBuffer = new byte[1024];
+
+       private final String[] tempDirs;
+
+       private final Random rnd = new Random();
+
+       private final DataInputDeserializer serializationReadBuffer;
+
+       private final ByteBuffer lengthBuffer;
+
+       private FileChannel spillingChannel;
+
+       private byte[] buffer;
+
+       private int recordLength;
+
+       private int accumulatedRecordBytes;
+
+       private MemorySegment leftOverData;
+
+       private int leftOverStart;
+
+       private int leftOverLimit;
+
+       private File spillFile;
+
+       private DataInputViewStreamWrapper spillFileReader;
+
+       public SpanningWrapper(String[] tempDirs) {
+               this.tempDirs = tempDirs;
+
+               this.lengthBuffer = ByteBuffer.allocate(4);
+               this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
+
+               this.recordLength = -1;
+
+               this.serializationReadBuffer = new DataInputDeserializer();
+               this.buffer = initialBuffer;
+       }
+
+       void initializeWithPartialRecord(NonSpanningWrapper partial, int 
nextRecordLength) throws IOException {
+               // set the length and copy what is available to the buffer
+               this.recordLength = nextRecordLength;
+
+               final int numBytesChunk = partial.remaining();
+
+               if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
+                       // create a spilling channel and put the data there
+                       this.spillingChannel = createSpillingChannel();
+
+                       ByteBuffer toWrite = 
partial.segment.wrap(partial.position, numBytesChunk);
+                       FileUtils.writeCompletely(this.spillingChannel, 
toWrite);
+               }
+               else {
+                       // collect in memory
+                       ensureBufferCapacity(nextRecordLength);
+                       partial.segment.get(partial.position, buffer, 0, 
numBytesChunk);
+               }
+
+               this.accumulatedRecordBytes = numBytesChunk;
+       }
+
+       void initializeWithPartialLength(NonSpanningWrapper partial) throws 
IOException {
+               // copy what we have to the length buffer
+               partial.segment.get(partial.position, this.lengthBuffer, 
partial.remaining());
+       }
+
+       void addNextChunkFromMemorySegment(MemorySegment segment, int offset, 
int numBytes) throws IOException {
+               int segmentPosition = offset;
+               int segmentRemaining = numBytes;
+               // check where to go. if we have a partial length, we need to 
complete it first
+               if (this.lengthBuffer.position() > 0) {
+                       int toPut = Math.min(this.lengthBuffer.remaining(), 
segmentRemaining);
+                       segment.get(segmentPosition, this.lengthBuffer, toPut);
+                       // did we complete the length?
+                       if (this.lengthBuffer.hasRemaining()) {
+                               return;
+                       } else {
+                               this.recordLength = this.lengthBuffer.getInt(0);
+
+                               this.lengthBuffer.clear();
+                               segmentPosition += toPut;
+                               segmentRemaining -= toPut;
+                               if (this.recordLength > THRESHOLD_FOR_SPILLING) 
{
+                                       this.spillingChannel = 
createSpillingChannel();
+                               } else {
+                                       ensureBufferCapacity(this.recordLength);
+                               }
+                       }
+               }
+
+               // copy as much as we need or can for this next spanning record
+               int needed = this.recordLength - this.accumulatedRecordBytes;
+               int toCopy = Math.min(needed, segmentRemaining);
+
+               if (spillingChannel != null) {
+                       // spill to file
+                       ByteBuffer toWrite = segment.wrap(segmentPosition, 
toCopy);
+                       FileUtils.writeCompletely(this.spillingChannel, 
toWrite);
+               } else {
+                       segment.get(segmentPosition, buffer, 
this.accumulatedRecordBytes, toCopy);
+               }
+
+               this.accumulatedRecordBytes += toCopy;
+
+               if (toCopy < segmentRemaining) {
+                       // there is more data in the segment
+                       this.leftOverData = segment;
+                       this.leftOverStart = segmentPosition + toCopy;
+                       this.leftOverLimit = numBytes + offset;
+               }
+
+               if (accumulatedRecordBytes == recordLength) {
+                       // we have the full record
+                       if (spillingChannel == null) {
+                               this.serializationReadBuffer.setBuffer(buffer, 
0, recordLength);
+                       }
+                       else {
+                               spillingChannel.close();
+
+                               BufferedInputStream inStream = new 
BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024);
+                               this.spillFileReader = new 
DataInputViewStreamWrapper(inStream);
+                       }
+               }
+       }
+
+       Optional<MemorySegment> getUnconsumedSegment() throws IOException {
+               // for the case of only partial length, no data
+               final int position = lengthBuffer.position();
+               if (position > 0) {
+                       MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(position);
+                       lengthBuffer.position(0);
+                       segment.put(0, lengthBuffer, position);
+                       return Optional.of(segment);
+               }
+
+               // for the case of full length, partial data in buffer
+               if (recordLength > THRESHOLD_FOR_SPILLING) {
+                       throw new UnsupportedOperationException("Unaligned 
checkpoint currently do not support spilled " +
+                               "records.");
+               } else if (recordLength != -1) {
+                       int leftOverSize = leftOverLimit - leftOverStart;
+                       int unconsumedSize = Integer.BYTES + 
accumulatedRecordBytes + leftOverSize;
+                       DataOutputSerializer serializer = new 
DataOutputSerializer(unconsumedSize);
+                       serializer.writeInt(recordLength);
+                       serializer.write(buffer, 0, accumulatedRecordBytes);
+                       if (leftOverData != null) {
+                               serializer.write(leftOverData, leftOverStart, 
leftOverSize);
+                       }
+                       MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(unconsumedSize);
+                       segment.put(0, serializer.getSharedBuffer(), 0, 
segment.size());
+                       return Optional.of(segment);
+               }
+
+               // for the case of no remaining partial length or data
+               return Optional.empty();
+       }
+
+       void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper 
deserializer) {
+               deserializer.clear();
+
+               if (leftOverData != null) {
+                       deserializer.initializeFromMemorySegment(leftOverData, 
leftOverStart, leftOverLimit);
+               }
+       }
+
+       boolean hasFullRecord() {
+               return this.recordLength >= 0 && this.accumulatedRecordBytes >= 
this.recordLength;
+       }
+
+       int getNumGatheredBytes() {
+               return this.accumulatedRecordBytes + (this.recordLength >= 0 ? 
4 : lengthBuffer.position());
+       }
+
+       public void clear() {
+               this.buffer = initialBuffer;
+               this.serializationReadBuffer.releaseArrays();
+
+               this.recordLength = -1;
+               this.lengthBuffer.clear();
+               this.leftOverData = null;
+               this.leftOverStart = 0;
+               this.leftOverLimit = 0;
+               this.accumulatedRecordBytes = 0;
+
+               if (spillingChannel != null) {
+                       try {
+                               spillingChannel.close();
+                       }
+                       catch (Throwable t) {
+                               // ignore
+                       }
+                       spillingChannel = null;
+               }
+               if (spillFileReader != null) {
+                       try {
+                               spillFileReader.close();
+                       }
+                       catch (Throwable t) {
+                               // ignore
+                       }
+                       spillFileReader = null;
+               }
+               if (spillFile != null) {
+                       spillFile.delete();
+                       spillFile = null;
+               }
+       }
+
+       public DataInputView getInputView() {
+               if (spillFileReader == null) {
+                       return serializationReadBuffer;
+               }
+               else {
+                       return spillFileReader;
+               }
+       }
+
+       private void ensureBufferCapacity(int minLength) {
+               if (buffer.length < minLength) {
+                       byte[] newBuffer = new byte[Math.max(minLength, 
buffer.length * 2)];
+                       System.arraycopy(buffer, 0, newBuffer, 0, 
accumulatedRecordBytes);
+                       buffer = newBuffer;
+               }
+       }
+
+       @SuppressWarnings("resource")
+       private FileChannel createSpillingChannel() throws IOException {
+               if (spillFile != null) {
+                       throw new IllegalStateException("Spilling file already 
exists.");
+               }
+
+               // try to find a unique file name for the spilling channel
+               int maxAttempts = 10;
+               for (int attempt = 0; attempt < maxAttempts; attempt++) {
+                       String directory = 
tempDirs[rnd.nextInt(tempDirs.length)];
+                       spillFile = new File(directory, randomString(rnd) + 
".inputchannel");
+                       if (spillFile.createNewFile()) {
+                               return new RandomAccessFile(spillFile, 
"rw").getChannel();
+                       }
+               }
+
+               throw new IOException(
+                       "Could not find a unique file channel name in '" + 
Arrays.toString(tempDirs) +
+                               "' for spilling large records during 
deserialization.");
+       }
+
+       private static String randomString(Random random) {
+               final byte[] bytes = new byte[20];
+               random.nextBytes(bytes);
+               return StringUtils.byteToHexString(bytes);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 5003e78..f20fbc9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -19,29 +19,13 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputDeserializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.StringUtils;
 
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
-import java.util.Arrays;
 import java.util.Optional;
-import java.util.Random;
 
 /**
  * @param <T> The type of the record to be deserialized.
@@ -54,8 +38,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                                        "(Value or Writable), check their 
serialization methods. If you are using a " +
                                        "Kryo-serialized type, check the 
corresponding Kryo serializer.";
 
-       private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 
MiBytes
-
        private final NonSpanningWrapper nonSpanningWrapper;
 
        private final SpanningWrapper spanningWrapper;
@@ -179,264 +161,4 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                return this.nonSpanningWrapper.remaining() > 0 || 
this.spanningWrapper.getNumGatheredBytes() > 0;
        }
 
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       private static final class SpanningWrapper {
-
-               private final byte[] initialBuffer = new byte[1024];
-
-               private final String[] tempDirs;
-
-               private final Random rnd = new Random();
-
-               private final DataInputDeserializer serializationReadBuffer;
-
-               private final ByteBuffer lengthBuffer;
-
-               private FileChannel spillingChannel;
-
-               private byte[] buffer;
-
-               private int recordLength;
-
-               private int accumulatedRecordBytes;
-
-               private MemorySegment leftOverData;
-
-               private int leftOverStart;
-
-               private int leftOverLimit;
-
-               private File spillFile;
-
-               private DataInputViewStreamWrapper spillFileReader;
-
-               public SpanningWrapper(String[] tempDirs) {
-                       this.tempDirs = tempDirs;
-
-                       this.lengthBuffer = ByteBuffer.allocate(4);
-                       this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
-
-                       this.recordLength = -1;
-
-                       this.serializationReadBuffer = new 
DataInputDeserializer();
-                       this.buffer = initialBuffer;
-               }
-
-               private void initializeWithPartialRecord(NonSpanningWrapper 
partial, int nextRecordLength) throws IOException {
-                       // set the length and copy what is available to the 
buffer
-                       this.recordLength = nextRecordLength;
-
-                       final int numBytesChunk = partial.remaining();
-
-                       if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
-                               // create a spilling channel and put the data 
there
-                               this.spillingChannel = createSpillingChannel();
-
-                               ByteBuffer toWrite = 
partial.segment.wrap(partial.position, numBytesChunk);
-                               FileUtils.writeCompletely(this.spillingChannel, 
toWrite);
-                       }
-                       else {
-                               // collect in memory
-                               ensureBufferCapacity(nextRecordLength);
-                               partial.segment.get(partial.position, buffer, 
0, numBytesChunk);
-                       }
-
-                       this.accumulatedRecordBytes = numBytesChunk;
-               }
-
-               private void initializeWithPartialLength(NonSpanningWrapper 
partial) throws IOException {
-                       // copy what we have to the length buffer
-                       partial.segment.get(partial.position, 
this.lengthBuffer, partial.remaining());
-               }
-
-               private void addNextChunkFromMemorySegment(MemorySegment 
segment, int offset, int numBytes) throws IOException {
-                       int segmentPosition = offset;
-                       int segmentRemaining = numBytes;
-                       // check where to go. if we have a partial length, we 
need to complete it first
-                       if (this.lengthBuffer.position() > 0) {
-                               int toPut = 
Math.min(this.lengthBuffer.remaining(), segmentRemaining);
-                               segment.get(segmentPosition, this.lengthBuffer, 
toPut);
-                               // did we complete the length?
-                               if (this.lengthBuffer.hasRemaining()) {
-                                       return;
-                               } else {
-                                       this.recordLength = 
this.lengthBuffer.getInt(0);
-
-                                       this.lengthBuffer.clear();
-                                       segmentPosition += toPut;
-                                       segmentRemaining -= toPut;
-                                       if (this.recordLength > 
THRESHOLD_FOR_SPILLING) {
-                                               this.spillingChannel = 
createSpillingChannel();
-                                       } else {
-                                               
ensureBufferCapacity(this.recordLength);
-                                       }
-                               }
-                       }
-
-                       // copy as much as we need or can for this next 
spanning record
-                       int needed = this.recordLength - 
this.accumulatedRecordBytes;
-                       int toCopy = Math.min(needed, segmentRemaining);
-
-                       if (spillingChannel != null) {
-                               // spill to file
-                               ByteBuffer toWrite = 
segment.wrap(segmentPosition, toCopy);
-                               FileUtils.writeCompletely(this.spillingChannel, 
toWrite);
-                       } else {
-                               segment.get(segmentPosition, buffer, 
this.accumulatedRecordBytes, toCopy);
-                       }
-
-                       this.accumulatedRecordBytes += toCopy;
-
-                       if (toCopy < segmentRemaining) {
-                               // there is more data in the segment
-                               this.leftOverData = segment;
-                               this.leftOverStart = segmentPosition + toCopy;
-                               this.leftOverLimit = numBytes + offset;
-                       }
-
-                       if (accumulatedRecordBytes == recordLength) {
-                               // we have the full record
-                               if (spillingChannel == null) {
-                                       
this.serializationReadBuffer.setBuffer(buffer, 0, recordLength);
-                               }
-                               else {
-                                       spillingChannel.close();
-
-                                       BufferedInputStream inStream = new 
BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024);
-                                       this.spillFileReader = new 
DataInputViewStreamWrapper(inStream);
-                               }
-                       }
-               }
-
-               Optional<MemorySegment> getUnconsumedSegment() throws 
IOException {
-                       // for the case of only partial length, no data
-                       final int position = lengthBuffer.position();
-                       if (position > 0) {
-                               MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(position);
-                               lengthBuffer.position(0);
-                               segment.put(0, lengthBuffer, position);
-                               return Optional.of(segment);
-                       }
-
-                       // for the case of full length, partial data in buffer
-                       if (recordLength > THRESHOLD_FOR_SPILLING) {
-                               throw new 
UnsupportedOperationException("Unaligned checkpoint currently do not support 
spilled " +
-                                       "records.");
-                       } else if (recordLength != -1) {
-                               int leftOverSize = leftOverLimit - 
leftOverStart;
-                               int unconsumedSize = Integer.BYTES + 
accumulatedRecordBytes + leftOverSize;
-                               DataOutputSerializer serializer = new 
DataOutputSerializer(unconsumedSize);
-                               serializer.writeInt(recordLength);
-                               serializer.write(buffer, 0, 
accumulatedRecordBytes);
-                               if (leftOverData != null) {
-                                       serializer.write(leftOverData, 
leftOverStart, leftOverSize);
-                               }
-                               MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(unconsumedSize);
-                               segment.put(0, serializer.getSharedBuffer(), 0, 
segment.size());
-                               return Optional.of(segment);
-                       }
-
-                       // for the case of no remaining partial length or data
-                       return Optional.empty();
-               }
-
-               private void 
moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) {
-                       deserializer.clear();
-
-                       if (leftOverData != null) {
-                               
deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, 
leftOverLimit);
-                       }
-               }
-
-               private boolean hasFullRecord() {
-                       return this.recordLength >= 0 && 
this.accumulatedRecordBytes >= this.recordLength;
-               }
-
-               private int getNumGatheredBytes() {
-                       return this.accumulatedRecordBytes + (this.recordLength 
>= 0 ? 4 : lengthBuffer.position());
-               }
-
-               public void clear() {
-                       this.buffer = initialBuffer;
-                       this.serializationReadBuffer.releaseArrays();
-
-                       this.recordLength = -1;
-                       this.lengthBuffer.clear();
-                       this.leftOverData = null;
-                       this.leftOverStart = 0;
-                       this.leftOverLimit = 0;
-                       this.accumulatedRecordBytes = 0;
-
-                       if (spillingChannel != null) {
-                               try {
-                                       spillingChannel.close();
-                               }
-                               catch (Throwable t) {
-                                       // ignore
-                               }
-                               spillingChannel = null;
-                       }
-                       if (spillFileReader != null) {
-                               try {
-                                       spillFileReader.close();
-                               }
-                               catch (Throwable t) {
-                                       // ignore
-                               }
-                               spillFileReader = null;
-                       }
-                       if (spillFile != null) {
-                               spillFile.delete();
-                               spillFile = null;
-                       }
-               }
-
-               public DataInputView getInputView() {
-                       if (spillFileReader == null) {
-                               return serializationReadBuffer;
-                       }
-                       else {
-                               return spillFileReader;
-                       }
-               }
-
-               private void ensureBufferCapacity(int minLength) {
-                       if (buffer.length < minLength) {
-                               byte[] newBuffer = new byte[Math.max(minLength, 
buffer.length * 2)];
-                               System.arraycopy(buffer, 0, newBuffer, 0, 
accumulatedRecordBytes);
-                               buffer = newBuffer;
-                       }
-               }
-
-               @SuppressWarnings("resource")
-               private FileChannel createSpillingChannel() throws IOException {
-                       if (spillFile != null) {
-                               throw new IllegalStateException("Spilling file 
already exists.");
-                       }
-
-                       // try to find a unique file name for the spilling 
channel
-                       int maxAttempts = 10;
-                       for (int attempt = 0; attempt < maxAttempts; attempt++) 
{
-                               String directory = 
tempDirs[rnd.nextInt(tempDirs.length)];
-                               spillFile = new File(directory, 
randomString(rnd) + ".inputchannel");
-                               if (spillFile.createNewFile()) {
-                                       return new RandomAccessFile(spillFile, 
"rw").getChannel();
-                               }
-                       }
-
-                       throw new IOException(
-                               "Could not find a unique file channel name in 
'" + Arrays.toString(tempDirs) +
-                                       "' for spilling large records during 
deserialization.");
-               }
-
-               private static String randomString(Random random) {
-                       final byte[] bytes = new byte[20];
-                       random.nextBytes(bytes);
-                       return StringUtils.byteToHexString(bytes);
-               }
-       }
 }

Reply via email to