This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2ed38c12be7151ab49e9cf2b4e2d8138f1ae4c62
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu May 7 16:48:47 2020 +0200

    [FLINK-17547][task] Implement getUnconsumedSegment for spilled buffers
---
 .../flink/core/memory/MemorySegmentFactory.java    |  28 ++++-
 .../org/apache/flink/util/CloseableIterator.java   |  32 ++++++
 .../core/memory/MemorySegmentFactoryTest.java      |  64 ++++++++++++
 .../apache/flink/util/CloseableIteratorTest.java   |  82 +++++++++++++++
 .../runtime/io/disk/FileBasedBufferIterator.java   |  90 ++++++++++++++++
 .../network/api/serialization/SpanningWrapper.java |  54 +++++++---
 .../api/serialization/SpanningWrapperTest.java     | 115 +++++++++++++++++++++
 7 files changed, 448 insertions(+), 17 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index ee301a1..f643bc4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A factory for (hybrid) memory segments ({@link HybridMemorySegment}).
  *
@@ -53,6 +55,31 @@ public final class MemorySegmentFactory {
        }
 
        /**
+        * Copies the given heap memory region and creates a new memory segment 
wrapping it.
+        *
+        * @param bytes The heap memory region.
+        * @param start starting position, inclusive
+        * @param end end position, exclusive
+        * @return A new memory segment that targets a copy of the given heap 
memory region.
+        * @throws IllegalArgumentException if start > end or end > bytes.length
+        */
+       public static MemorySegment wrapCopy(byte[] bytes, int start, int end) 
throws IllegalArgumentException {
+               checkArgument(end >= start);
+               checkArgument(end <= bytes.length);
+               MemorySegment copy = allocateUnpooledSegment(end - start);
+               copy.put(0, bytes, start, copy.size());
+               return copy;
+       }
+
+       /**
+        * Wraps the four bytes representing the given number with a {@link 
MemorySegment}.
+        * @see ByteBuffer#putInt(int)
+        */
+       public static MemorySegment wrapInt(int value) {
+               return 
wrap(ByteBuffer.allocate(Integer.BYTES).putInt(value).array());
+       }
+
+       /**
         * Allocates some unpooled memory and creates a new memory segment that 
represents
         * that memory.
         *
@@ -161,5 +188,4 @@ public final class MemorySegmentFactory {
        public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) {
                return new HybridMemorySegment(memory, null);
        }
-
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java 
b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
index cc51324..e0c5ec0 100644
--- a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
@@ -24,7 +24,9 @@ import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.function.Consumer;
 
 import static java.util.Arrays.asList;
@@ -80,6 +82,36 @@ public interface CloseableIterator<T> extends Iterator<T>, 
AutoCloseable {
                };
        }
 
+       static <T> CloseableIterator<T> flatten(CloseableIterator<T>... 
iterators) {
+               return new CloseableIterator<T>() {
+                       private final Queue<CloseableIterator<T>> queue = 
removeEmptyHead(new LinkedList<>(asList(iterators)));
+
+                       private Queue<CloseableIterator<T>> 
removeEmptyHead(Queue<CloseableIterator<T>> queue) {
+                               while (!queue.isEmpty() && 
!queue.peek().hasNext()) {
+                                       queue.poll();
+                               }
+                               return queue;
+                       }
+
+                       @Override
+                       public boolean hasNext() {
+                               removeEmptyHead(queue);
+                               return !queue.isEmpty();
+                       }
+
+                       @Override
+                       public T next() {
+                               removeEmptyHead(queue);
+                               return queue.peek().next();
+                       }
+
+                       @Override
+                       public void close() throws Exception {
+                               IOUtils.closeAll(iterators);
+                       }
+               };
+       }
+
        @SuppressWarnings("unchecked")
        static <T> CloseableIterator<T> empty() {
                return (CloseableIterator<T>) EMPTY_INSTANCE;
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
new file mode 100644
index 0000000..59c1d7e
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.junit.Test;
+
+import static java.lang.System.arraycopy;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * {@link MemorySegmentFactory} test.
+ */
+public class MemorySegmentFactoryTest {
+
+       @Test
+       public void testWrapCopyChangingData() {
+               byte[] data = {1, 2, 3, 4, 5};
+               byte[] changingData = new byte[data.length];
+               arraycopy(data, 0, changingData, 0, data.length);
+               MemorySegment segment = 
MemorySegmentFactory.wrapCopy(changingData, 0, changingData.length);
+               changingData[0]++;
+               assertArrayEquals(data, segment.heapMemory);
+       }
+
+       @Test
+       public void testWrapPartialCopy() {
+               byte[] data = {1, 2, 3, 5, 6};
+               MemorySegment segment = MemorySegmentFactory.wrapCopy(data, 0, 
data.length / 2);
+               byte[] exp = new byte[segment.size()];
+               arraycopy(data, 0, exp, 0, exp.length);
+               assertArrayEquals(exp, segment.heapMemory);
+       }
+
+       @Test
+       public void testWrapCopyEmpty() {
+               MemorySegmentFactory.wrapCopy(new byte[0], 0, 0);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testWrapCopyWrongStart() {
+               MemorySegmentFactory.wrapCopy(new byte[]{1, 2, 3}, 10, 3);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testWrapCopyWrongEnd() {
+               MemorySegmentFactory.wrapCopy(new byte[]{1, 2, 3}, 0, 10);
+       }
+
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java 
b/flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java
new file mode 100644
index 0000000..e2d4d3f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link CloseableIterator} test.
+ */
+@SuppressWarnings("unchecked")
+public class CloseableIteratorTest {
+
+       private static final String[] ELEMENTS = new String[]{"flink", "blink"};
+
+       @Test
+       public void testFlattenEmpty() throws Exception {
+               List<CloseableIterator<?>> iterators = asList(
+                               CloseableIterator.flatten(),
+                               
CloseableIterator.flatten(CloseableIterator.empty()),
+                               
CloseableIterator.flatten(CloseableIterator.flatten()));
+               for (CloseableIterator<?> i : iterators) {
+                       assertFalse(i.hasNext());
+                       i.close();
+               }
+       }
+
+       @Test
+       public void testFlattenIteration() {
+               CloseableIterator<String> iterator = CloseableIterator.flatten(
+                               CloseableIterator.ofElement(ELEMENTS[0], unused 
-> {
+                               }),
+                               CloseableIterator.ofElement(ELEMENTS[1], unused 
-> {
+                               })
+               );
+
+               List<String> iterated = new ArrayList<>();
+               iterator.forEachRemaining(iterated::add);
+               assertArrayEquals(ELEMENTS, iterated.toArray());
+       }
+
+       @Test(expected = TestException.class)
+       public void testFlattenErrorHandling() throws Exception {
+               List<String> closed = new ArrayList<>();
+               CloseableIterator<String> iterator = CloseableIterator.flatten(
+                               CloseableIterator.ofElement(ELEMENTS[0], e -> {
+                                       closed.add(e);
+                                       throw new TestException();
+                               }),
+                               CloseableIterator.ofElement(ELEMENTS[1], 
closed::add)
+               );
+               try {
+                       iterator.close();
+               } finally {
+                       assertArrayEquals(ELEMENTS, closed.toArray());
+               }
+       }
+
+       private static class TestException extends RuntimeException {
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
new file mode 100644
index 0000000..c7e1cd8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RefCountedFile;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.util.CloseableIterator;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrap;
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+import static org.apache.flink.util.IOUtils.closeAll;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link CloseableIterator} of {@link Buffer buffers} over file content.
+ */
+@Internal
+public class FileBasedBufferIterator implements CloseableIterator<Buffer> {
+
+       private final RefCountedFile file;
+       private final FileInputStream stream;
+       private final int bufferSize;
+
+       private int offset;
+       private int bytesToRead;
+
+       public FileBasedBufferIterator(RefCountedFile file, int bytesToRead, 
int bufferSize) throws FileNotFoundException {
+               checkNotNull(file);
+               checkArgument(bytesToRead >= 0);
+               checkArgument(bufferSize > 0);
+               this.stream = new FileInputStream(file.getFile());
+               this.file = file;
+               this.bufferSize = bufferSize;
+               this.bytesToRead = bytesToRead;
+               file.retain();
+       }
+
+       @Override
+       public boolean hasNext() {
+               return bytesToRead > 0;
+       }
+
+       @Override
+       public Buffer next() {
+               byte[] buffer = new byte[bufferSize];
+               int bytesRead = read(buffer);
+               checkState(bytesRead >= 0, "unexpected end of file, file = " + 
file.getFile() + ", offset=" + offset);
+               offset += bytesRead;
+               bytesToRead -= bytesRead;
+               return new NetworkBuffer(wrap(buffer), 
FreeingBufferRecycler.INSTANCE, DATA_BUFFER, bytesRead);
+       }
+
+       private int read(byte[] buffer) {
+               int limit = Math.min(buffer.length, bytesToRead);
+               try {
+                       return stream.read(buffer, offset, limit);
+               } catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               closeAll(stream, file::release);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index 9cffde3..45d6ad7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -24,7 +24,10 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.disk.FileBasedBufferIterator;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.StringUtils;
 
@@ -41,15 +44,18 @@ import java.util.Random;
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrapCopy;
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrapInt;
 import static 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.singleBufferIterator;
 import static 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+import static org.apache.flink.util.CloseableIterator.empty;
 import static org.apache.flink.util.FileUtils.writeCompletely;
 import static org.apache.flink.util.IOUtils.closeAllQuietly;
 
 final class SpanningWrapper {
 
-       private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 
MiBytes
-       private static final int FILE_BUFFER_SIZE = 2 * 1024 * 1024;
+       private static final int DEFAULT_THRESHOLD_FOR_SPILLING = 5 * 1024 * 
1024; // 5 MiBytes
+       private static final int DEFAULT_FILE_BUFFER_SIZE = 2 * 1024 * 1024;
 
        private final byte[] initialBuffer = new byte[1024];
 
@@ -61,6 +67,8 @@ final class SpanningWrapper {
 
        final ByteBuffer lengthBuffer;
 
+       private final int fileBufferSize;
+
        private FileChannel spillingChannel;
 
        private byte[] buffer;
@@ -79,16 +87,21 @@ final class SpanningWrapper {
 
        private DataInputViewStreamWrapper spillFileReader;
 
+       private int thresholdForSpilling;
+
        SpanningWrapper(String[] tempDirs) {
-               this.tempDirs = tempDirs;
+               this(tempDirs, DEFAULT_THRESHOLD_FOR_SPILLING, 
DEFAULT_FILE_BUFFER_SIZE);
+       }
 
+       SpanningWrapper(String[] tempDirectories, int threshold, int 
fileBufferSize) {
+               this.tempDirs = tempDirectories;
                this.lengthBuffer = ByteBuffer.allocate(LENGTH_BYTES);
                this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
-
                this.recordLength = -1;
-
                this.serializationReadBuffer = new DataInputDeserializer();
                this.buffer = initialBuffer;
+               this.thresholdForSpilling = threshold;
+               this.fileBufferSize = fileBufferSize;
        }
 
        /**
@@ -101,7 +114,7 @@ final class SpanningWrapper {
        }
 
        private boolean isAboveSpillingThreshold() {
-               return recordLength > THRESHOLD_FOR_SPILLING;
+               return recordLength > thresholdForSpilling;
        }
 
        void addNextChunkFromMemorySegment(MemorySegment segment, int offset, 
int numBytes) throws IOException {
@@ -137,7 +150,7 @@ final class SpanningWrapper {
                accumulatedRecordBytes += length;
                if (hasFullRecord()) {
                        spillingChannel.close();
-                       spillFileReader = new DataInputViewStreamWrapper(new 
BufferedInputStream(new FileInputStream(spillFile.getFile()), 
FILE_BUFFER_SIZE));
+                       spillFileReader = new DataInputViewStreamWrapper(new 
BufferedInputStream(new FileInputStream(spillFile.getFile()), fileBufferSize));
                }
        }
 
@@ -170,22 +183,26 @@ final class SpanningWrapper {
 
        CloseableIterator<Buffer> getUnconsumedSegment() throws IOException {
                if (isReadingLength()) {
-                       return singleBufferIterator(copyLengthBuffer());
+                       return 
singleBufferIterator(wrapCopy(lengthBuffer.array(), 0, 
lengthBuffer.position()));
                } else if (isAboveSpillingThreshold()) {
-                       throw new UnsupportedOperationException("Unaligned 
checkpoint currently do not support spilled records.");
+                       return createSpilledDataIterator();
                } else if (recordLength == -1) {
-                       return CloseableIterator.empty(); // no remaining 
partial length or data
+                       return empty(); // no remaining partial length or data
                } else {
                        return singleBufferIterator(copyDataBuffer());
                }
        }
 
-       private MemorySegment copyLengthBuffer() {
-               int position = lengthBuffer.position();
-               MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(position);
-               lengthBuffer.position(0);
-               segment.put(0, lengthBuffer, position);
-               return segment;
+       @SuppressWarnings("unchecked")
+       private CloseableIterator<Buffer> createSpilledDataIterator() throws 
IOException {
+               if (spillingChannel != null && spillingChannel.isOpen()) {
+                       spillingChannel.force(false);
+               }
+               return CloseableIterator.flatten(
+                       toSingleBufferIterator(wrapInt(recordLength)),
+                       new FileBasedBufferIterator(spillFile, 
min(accumulatedRecordBytes, recordLength), fileBufferSize),
+                       leftOverData == null ? empty() : 
toSingleBufferIterator(wrapCopy(leftOverData.getArray(), leftOverStart, 
leftOverLimit))
+               );
        }
 
        private MemorySegment copyDataBuffer() throws IOException {
@@ -289,4 +306,9 @@ final class SpanningWrapper {
                return lengthBuffer.position() > 0;
        }
 
+       private static CloseableIterator<Buffer> 
toSingleBufferIterator(MemorySegment segment) {
+               NetworkBuffer buffer = new NetworkBuffer(segment, 
FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, segment.size());
+               return CloseableIterator.ofElement(buffer, 
Buffer::recycleBuffer);
+       }
+
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
new file mode 100644
index 0000000..be57e21
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.serialization;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrap;
+import static 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * {@link SpanningWrapper} test.
+ */
+public class SpanningWrapperTest {
+
+       private static final Random random = new Random();
+
+       @Rule
+       public TemporaryFolder folder = new TemporaryFolder();
+
+       @Test
+       public void testLargeUnconsumedSegment() throws Exception {
+               int recordLen = 100;
+               int firstChunk = (int) (recordLen * .9);
+               int spillingThreshold = (int) (firstChunk * .9);
+
+               byte[] record1 = recordBytes(recordLen);
+               byte[] record2 = recordBytes(recordLen * 2);
+
+               SpanningWrapper spanningWrapper = new SpanningWrapper(new 
String[]{folder.newFolder().getAbsolutePath()}, spillingThreshold, recordLen);
+               spanningWrapper.transferFrom(wrapNonSpanning(record1, 
firstChunk), recordLen);
+               spanningWrapper.addNextChunkFromMemorySegment(wrap(record1), 
firstChunk, recordLen - firstChunk + LENGTH_BYTES);
+               spanningWrapper.addNextChunkFromMemorySegment(wrap(record2), 0, 
record2.length);
+
+               CloseableIterator<Buffer> unconsumedSegment = 
spanningWrapper.getUnconsumedSegment();
+
+               spanningWrapper.getInputView().readFully(new byte[recordLen], 
0, recordLen); // read out from file
+               spanningWrapper.transferLeftOverTo(new NonSpanningWrapper()); 
// clear any leftover
+               
spanningWrapper.transferFrom(wrapNonSpanning(recordBytes(recordLen), 
recordLen), recordLen); // overwrite with new data
+
+               assertArrayEquals(concat(record1, record2), 
toByteArray(unconsumedSegment));
+       }
+
+       private byte[] recordBytes(int recordLen) {
+               byte[] inputData = randomBytes(recordLen + LENGTH_BYTES);
+               for (int i = 0; i < Integer.BYTES; i++) {
+                       inputData[Integer.BYTES - i - 1] = (byte) (recordLen 
>>> i * 8);
+               }
+               return inputData;
+       }
+
+       private NonSpanningWrapper wrapNonSpanning(byte[] bytes, int len) {
+               NonSpanningWrapper nonSpanningWrapper = new 
NonSpanningWrapper();
+               MemorySegment segment = wrap(bytes);
+               nonSpanningWrapper.initializeFromMemorySegment(segment, 0, len);
+               nonSpanningWrapper.readInt(); // emulate read length performed 
in getNextRecord to move position
+               return nonSpanningWrapper;
+       }
+
+       private byte[] toByteArray(CloseableIterator<Buffer> unconsumed) {
+               final List<Buffer> buffers = new ArrayList<>();
+               try {
+                       unconsumed.forEachRemaining(buffers::add);
+                       byte[] result = new 
byte[buffers.stream().mapToInt(Buffer::readableBytes).sum()];
+                       int offset = 0;
+                       for (Buffer buffer : buffers) {
+                               int len = buffer.readableBytes();
+                               buffer.getNioBuffer(0, len).get(result, offset, 
len);
+                               offset += len;
+                       }
+                       return result;
+               } finally {
+                       buffers.forEach(Buffer::recycleBuffer);
+               }
+       }
+
+       private byte[] randomBytes(int length) {
+               byte[] inputData = new byte[length];
+               random.nextBytes(inputData);
+               return inputData;
+       }
+
+       private byte[] concat(byte[] input1, byte[] input2) {
+               byte[] expected = new byte[input1.length + input2.length];
+               System.arraycopy(input1, 0, expected, 0, input1.length);
+               System.arraycopy(input2, 0, expected, input1.length, 
input2.length);
+               return expected;
+       }
+
+}

Reply via email to