This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 66676d9bf8 New buffers (#13304)
66676d9bf8 is described below
commit 66676d9bf89326a1850f7872062bec0b3e5115ad
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Mon Jun 24 13:00:58 2024 +0200
New buffers (#13304)
Add some new IO and buffer classes:
- DataBuffer, a new interface that contains most (if not all) methods of
PinotDataBuffer, which now implements this interface
- CompoundDataBuffer, a new class that wraps a list of DataBuffers and
offers a continuous DataBuffer withouth copying them.
- PinotOutputStream, an OutputStream whose cursor can be moved.
- PinotInputStream, the same, but for InputStream.
- PagedPinotOutputStream, a PinotOutputStream that stores data into a list
of ByteBuffers. When the last ByteBuffer in the list is filled, a new one is
created.
- DataBufferPinotInputStream, an adaptor that takes a DataBuffer and shows
it as a PinotInputStream.
---
.../pinot/segment/spi/memory/BasePinotLBuffer.java | 2 +-
.../segment/spi/memory/CompoundDataBuffer.java | 674 +++++++++++++++++++++
.../pinot/segment/spi/memory/DataBuffer.java | 288 +++++++++
.../spi/memory/DataBufferPinotInputStream.java | 244 ++++++++
.../spi/memory/NonNativePinotDataBuffer.java | 2 +-
.../segment/spi/memory/PagedPinotOutputStream.java | 406 +++++++++++++
.../pinot/segment/spi/memory/PinotByteBuffer.java | 14 +-
.../pinot/segment/spi/memory/PinotDataBuffer.java | 194 ++++--
.../pinot/segment/spi/memory/PinotInputStream.java | 107 ++++
.../segment/spi/memory/PinotOutputStream.java | 142 +++++
.../segment/spi/memory/CompoundDataBufferTest.java | 313 ++++++++++
.../spi/memory/DataBufferPinotInputStreamTest.java | 208 +++++++
.../spi/memory/PagedPinotOutputStreamTest.java | 314 ++++++++++
13 files changed, 2850 insertions(+), 58 deletions(-)
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java
index 0718c99082..9ba50911d2 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java
@@ -62,7 +62,7 @@ public abstract class BasePinotLBuffer extends
PinotDataBuffer {
}
@Override
- public void copyTo(long offset, PinotDataBuffer buffer, long destOffset,
long size) {
+ public void copyTo(long offset, DataBuffer buffer, long destOffset, long
size) {
if (buffer instanceof BasePinotLBuffer) {
_buffer.copyTo(offset, ((BasePinotLBuffer) buffer)._buffer, destOffset,
size);
} else {
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CompoundDataBuffer.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CompoundDataBuffer.java
new file mode 100644
index 0000000000..69e3522128
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CompoundDataBuffer.java
@@ -0,0 +1,674 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+
+/**
+ * A {@link DataBuffer} that is composed of multiple {@link DataBuffer}s that
define a single contiguous buffer.
+ * <p>
+ * While reads and writes can span multiple buffers, there may be a
performance impact when doing so.
+ * Therefore it is recommended to try to wrap independent buffers.
+ * <p>
+ * Once this class is built, buffers cannot be added or removed.
+ */
+public class CompoundDataBuffer implements DataBuffer {
+
+ private final DataBuffer[] _buffers;
+ private final long[] _bufferOffsets;
+ private int _lastBufferIndex = 0;
+ private final ByteOrder _order;
+ private final long _size;
+ private final boolean _owner;
+
+ /**
+ * Creates a compound buffer from the given buffers.
+ *
+ * @param buffers The buffers that will be concatenated to form the compound
buffer.
+ * @param order The byte order of the buffer. Buffers in the array that have
a different byte order will be converted.
+ * @param owner Whether this buffer owns the underlying buffers. If true,
the underlying buffers will be released when
+ * this buffer is closed.
+ */
+ public CompoundDataBuffer(DataBuffer[] buffers, ByteOrder order, boolean
owner) {
+ _owner = owner;
+ _buffers = buffers;
+ _bufferOffsets = new long[buffers.length];
+ _order = order;
+ long offset = 0;
+ for (int i = 0; i < buffers.length; i++) {
+ if (buffers[i].size() == 0) {
+ throw new IllegalArgumentException("Buffer at index " + i + " is
empty");
+ }
+ if (buffers[i].order() != _order) {
+ buffers[i] = buffers[i].view(0, buffers[i].size(), _order);
+ }
+ }
+ for (int i = 0; i < buffers.length; i++) {
+ _bufferOffsets[i] = offset;
+ long size = buffers[i].size();
+ offset += size;
+ }
+ _size = offset;
+ }
+
+ public CompoundDataBuffer(ByteBuffer[] buffers, ByteOrder order, boolean
owner) {
+ this(asDataBufferArray(buffers), order, owner);
+ }
+
+ /**
+ * Creates a compound buffer from the given buffers.
+ * @param buffers The buffers that will be concatenated to form the compound
buffer.
+ * @param order The byte order of the buffer. Buffers in the list that have
a different byte order will be converted.
+ * @param owner Whether this buffer owns the underlying buffers. If true,
the underlying buffers will be released when
+ * this buffer is closed.
+ */
+ public CompoundDataBuffer(List<DataBuffer> buffers, ByteOrder order, boolean
owner) {
+ this(buffers.toArray(new DataBuffer[0]), order, owner);
+ }
+
+ private static DataBuffer[] asDataBufferArray(ByteBuffer[] buffers) {
+ DataBuffer[] result = new DataBuffer[buffers.length];
+ for (int i = 0; i < buffers.length; i++) {
+ result[i] = PinotByteBuffer.wrap(buffers[i]);
+ }
+ return result;
+ }
+
+ private int getBufferIndex(long offset) {
+ // this optimistically assumes that lookups are going to be in ascending
order
+ // we don't care about concurrency here given that this is only used to
speed up the lookup
+ int lastBufferIndex = _lastBufferIndex;
+ if (_bufferOffsets[lastBufferIndex] > offset) {
+ lastBufferIndex = 0;
+ }
+
+ for (int i = lastBufferIndex; i < _bufferOffsets.length; i++) {
+ if (offset < _bufferOffsets[i]) {
+ int result = i - 1;
+ _lastBufferIndex = result;
+ return result;
+ }
+ }
+ int result = _bufferOffsets.length - 1;
+ _lastBufferIndex = result;
+ return result;
+ }
+
+ private ByteBuffer copy(long offset, int length) {
+ if (offset + length > _size) {
+ throw new BufferUnderflowException();
+ }
+ byte[] result = new byte[length];
+
+ int bufferIndex = getBufferIndex(offset);
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+
+ DataBuffer buffer = _buffers[bufferIndex];
+ int toCopy = (int) Math.min(length, buffer.size() - inBufferIndex);
+ buffer.copyTo(inBufferIndex, result, 0, toCopy);
+
+ int remaining = length - toCopy;
+ while (remaining > 0) {
+ bufferIndex++;
+ buffer = _buffers[bufferIndex];
+ toCopy = (int) Math.min(remaining, buffer.size());
+ buffer.copyTo(0, result, length - remaining, toCopy);
+
+ remaining -= toCopy;
+ }
+ return ByteBuffer.wrap(result).order(_order);
+ }
+
+ @Override
+ public void readFrom(long offset, byte[] input, int srcOffset, int size) {
+ if (offset + size > _size) {
+ throw new BufferOverflowException();
+ }
+
+ int bufferIndex = getBufferIndex(offset);
+ int remaining = size;
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ while (remaining > 0) {
+ DataBuffer buffer = _buffers[bufferIndex];
+ int toRead = (int) Math.min(remaining, buffer.size() - inBufferIndex);
+ buffer.readFrom(inBufferIndex, input, srcOffset, toRead);
+
+ bufferIndex++;
+ remaining -= toRead;
+ srcOffset += toRead;
+ inBufferIndex = 0; // from now on we always write in the first position
of the buffer
+ }
+ }
+
+ @Override
+ public void readFrom(long offset, ByteBuffer input) {
+ if (offset + input.remaining() > _size) {
+ throw new BufferOverflowException();
+ }
+
+ int startLimit = input.limit();
+ int bufferIndex = getBufferIndex(offset);
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+
+ while (input.hasRemaining()) {
+ DataBuffer buffer = _buffers[bufferIndex];
+ int toRead = (int) Math.min(input.remaining(), buffer.size() -
inBufferIndex);
+
+ input.limit(input.position() + toRead);
+ buffer.readFrom(inBufferIndex, input);
+
+ input.position(input.limit());
+ input.limit(startLimit);
+ bufferIndex++;
+ inBufferIndex = 0; // from now on we always write in the first position
of the buffer
+ }
+ }
+
+ @Override
+ public void readFrom(long offset, File file, long srcOffset, long size)
+ throws IOException {
+ if (offset + size > _size) {
+ throw new BufferOverflowException();
+ }
+
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ long toRead = Math.min(size, buffer.size() - inBufferIndex);
+ buffer.readFrom(inBufferIndex, file, srcOffset, toRead);
+
+ long fileOffset = srcOffset + toRead;
+ long remaining = size - toRead;
+ while (remaining > 0) {
+ bufferIndex++;
+ buffer = _buffers[bufferIndex];
+ toRead = Math.min(remaining, buffer.size());
+ buffer.readFrom(0, file, fileOffset, toRead);
+
+ remaining -= toRead;
+ fileOffset += toRead;
+ }
+ }
+
+ @Override
+ public void copyTo(long offset, byte[] buffer, int destOffset, int size) {
+ if (offset + size > _size) {
+ throw new BufferUnderflowException();
+ }
+
+ int bufferIndex = getBufferIndex(offset);
+ int remaining = size;
+
+ DataBuffer bufferToCopy = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ int toCopy = (int) Math.min(remaining, bufferToCopy.size() -
inBufferIndex);
+ bufferToCopy.copyTo(inBufferIndex, buffer, destOffset, toCopy);
+
+ remaining -= toCopy;
+ destOffset += toCopy;
+
+ while (remaining > 0) {
+ bufferIndex++;
+ bufferToCopy = _buffers[bufferIndex];
+ toCopy = (int) Math.min(remaining, bufferToCopy.size());
+ bufferToCopy.copyTo(0, buffer, destOffset, toCopy);
+
+ remaining -= toCopy;
+ destOffset += toCopy;
+ }
+ }
+
+ @Override
+ public void copyTo(long offset, DataBuffer buffer, long destOffset, long
size) {
+ if (offset + size > _size) {
+ throw new BufferUnderflowException();
+ }
+
+ int bufferIndex = getBufferIndex(offset);
+ long remaining = size;
+
+ DataBuffer bufferToCopy = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ long toCopy = Math.min(remaining, bufferToCopy.size() - inBufferIndex);
+ bufferToCopy.copyTo(inBufferIndex, buffer, destOffset, toCopy);
+
+ bufferIndex++;
+ remaining -= toCopy;
+ destOffset += toCopy;
+
+ while (remaining > 0) {
+ bufferToCopy = _buffers[bufferIndex];
+ toCopy = Math.min(remaining, bufferToCopy.size());
+ bufferToCopy.copyTo(0, buffer, destOffset, toCopy);
+
+ bufferIndex++;
+ remaining -= toCopy;
+ destOffset += toCopy;
+ }
+ }
+
+ @Override
+ public byte getByte(long offset) {
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ return buffer.getByte(inBufferIndex);
+ }
+
+ @Override
+ public void putByte(long offset, byte value) {
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ buffer.putByte(inBufferIndex, value);
+ }
+
+ @Override
+ public char getChar(long offset) {
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ if (inBufferIndex + Character.BYTES <= buffer.size()) {
+ // fast path
+ return buffer.getChar(inBufferIndex);
+ } else {
+ // slow path
+ ByteBuffer byteBuffer = copy(offset, Character.BYTES);
+ return byteBuffer.getChar();
+ }
+ }
+
+ @Override
+ public void putChar(long offset, char value) {
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ if (inBufferIndex + Character.BYTES <= buffer.size()) {
+ // fast path
+ buffer.putChar(inBufferIndex, value);
+ } else {
+ // slow path
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Character.BYTES)
+ .order(_order)
+ .putChar(value);
+ byteBuffer.flip();
+ readFrom(offset, byteBuffer);
+ }
+ }
+
+ @Override
+ public short getShort(long offset) {
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ if (inBufferIndex + Short.BYTES <= buffer.size()) {
+ // fast path
+ return buffer.getShort(inBufferIndex);
+ } else {
+ // slow path
+ ByteBuffer byteBuffer = copy(offset, Short.BYTES);
+ return byteBuffer.getShort();
+ }
+ }
+
+ @Override
+ public void putShort(long offset, short value) {
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ if (inBufferIndex + Short.BYTES <= buffer.size()) {
+ // fast path
+ buffer.putShort(inBufferIndex, value);
+ } else {
+ // slow path
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Short.BYTES)
+ .order(_order)
+ .putShort(value);
+ byteBuffer.flip();
+ readFrom(offset, byteBuffer);
+ }
+ }
+
+ @Override
+ public int getInt(long offset) {
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ if (inBufferIndex + Integer.BYTES <= buffer.size()) {
+ // fast path
+ return buffer.getInt(inBufferIndex);
+ } else {
+ // slow path
+ ByteBuffer byteBuffer = copy(offset, Integer.BYTES);
+ return byteBuffer.getInt();
+ }
+ }
+
+ @Override
+ public void putInt(long offset, int value) {
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ if (inBufferIndex + Integer.BYTES <= buffer.size()) {
+ // fast path
+ buffer.putInt(inBufferIndex, value);
+ } else {
+ // slow path
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES)
+ .order(_order)
+ .putInt(value);
+ byteBuffer.flip();
+ readFrom(offset, byteBuffer);
+ }
+ }
+
+ @Override
+ public long getLong(long offset) {
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ if (inBufferIndex + Long.BYTES <= buffer.size()) {
+ // fast path
+ return buffer.getLong(inBufferIndex);
+ } else {
+ // slow path
+ ByteBuffer byteBuffer = copy(offset, Long.BYTES);
+ return byteBuffer.getLong();
+ }
+ }
+
+ @Override
+ public void putLong(long offset, long value) {
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ if (inBufferIndex + Long.BYTES <= buffer.size()) {
+ // fast path
+ buffer.putLong(inBufferIndex, value);
+ } else {
+ // slow path
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
+ .order(_order)
+ .putLong(value);
+ byteBuffer.flip();
+ readFrom(offset, byteBuffer);
+ }
+ }
+
+ @Override
+ public float getFloat(long offset) {
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ if (inBufferIndex + Float.BYTES <= buffer.size()) {
+ // fast path
+ return buffer.getFloat(inBufferIndex);
+ } else {
+ // slow path
+ ByteBuffer byteBuffer = copy(offset, Float.BYTES);
+ return byteBuffer.getFloat();
+ }
+ }
+
+ @Override
+ public void putFloat(long offset, float value) {
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ if (inBufferIndex + Float.BYTES <= buffer.size()) {
+ // fast path
+ buffer.putFloat(inBufferIndex, value);
+ } else {
+ // slow path
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Float.BYTES)
+ .order(_order)
+ .putFloat(value);
+ byteBuffer.flip();
+ readFrom(offset, byteBuffer);
+ }
+ }
+
+ @Override
+ public double getDouble(long offset) {
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ if (inBufferIndex + Double.BYTES <= buffer.size()) {
+ // fast path
+ return buffer.getDouble(inBufferIndex);
+ } else {
+ // slow path
+ ByteBuffer byteBuffer = copy(offset, Double.BYTES);
+ return byteBuffer.getDouble();
+ }
+ }
+
+ @Override
+ public void putDouble(long offset, double value) {
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ if (inBufferIndex + Double.BYTES <= buffer.size()) {
+ // fast path
+ buffer.putDouble(inBufferIndex, value);
+ } else {
+ // slow path
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Double.BYTES)
+ .order(_order)
+ .putDouble(value);
+ byteBuffer.flip();
+ readFrom(offset, byteBuffer);
+ }
+ }
+
+ @Override
+ public long size() {
+ return _size;
+ }
+
+ @Override
+ public ByteOrder order() {
+ return _order;
+ }
+
+ @Override
+ public DataBuffer view(long start, long end, ByteOrder byteOrder) {
+ if (start < 0 || end > _size || start > end) {
+ throw new IllegalArgumentException("Invalid start/end: " + start + "/" +
end);
+ }
+ if (start == 0 && end == _size && byteOrder == _order) {
+ return new CompoundDataBuffer(_buffers, _order, false);
+ }
+ int startBufferIndex = getBufferIndex(start);
+ int endBufferIndex = getBufferIndex(end - 1);
+
+ if (startBufferIndex == endBufferIndex) {
+ long bufferOffset = _bufferOffsets[startBufferIndex];
+ DataBuffer buffer = _buffers[startBufferIndex];
+ return buffer.view(start - bufferOffset, end - bufferOffset, byteOrder);
+ } else {
+ DataBuffer firstBuffer = _buffers[startBufferIndex]
+ .view(start - _bufferOffsets[startBufferIndex],
_buffers[startBufferIndex].size(), byteOrder);
+ DataBuffer lastBuffer = _buffers[endBufferIndex]
+ .view(0, end - _bufferOffsets[endBufferIndex], byteOrder);
+
+ DataBuffer[] buffers = new DataBuffer[endBufferIndex - startBufferIndex
+ 1];
+ buffers[0] = firstBuffer;
+ if (buffers.length > 2) {
+ System.arraycopy(_buffers, startBufferIndex + 1, buffers, 1,
buffers.length - 2);
+ }
+ buffers[buffers.length - 1] = lastBuffer;
+
+ return new CompoundDataBuffer(buffers, byteOrder, false);
+ }
+ }
+
+ @Override
+ public ImmutableRoaringBitmap viewAsRoaringBitmap(long offset, int length) {
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ if (length < buffer.size()) {
+ return buffer.viewAsRoaringBitmap(offset, length);
+ } else {
+ ByteBuffer copy = copy(offset, length);
+ return new ImmutableRoaringBitmap(copy);
+ }
+ }
+
+ @Override
+ public ByteBuffer copyOrView(long offset, int size, ByteOrder byteOrder) {
+ if (offset + size > _size) {
+ throw new BufferUnderflowException();
+ }
+ int bufferIndex = getBufferIndex(offset);
+ DataBuffer buffer = _buffers[bufferIndex];
+ long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+ if (inBufferIndex + size <= buffer.size()) {
+ return buffer.copyOrView(inBufferIndex, size, byteOrder);
+ } else {
+ return copy(offset, size).order(byteOrder);
+ }
+ }
+
+ @Override
+ public void flush() {
+ RuntimeException firstException = null;
+
+ for (DataBuffer buffer : _buffers) {
+ try {
+ buffer.flush();
+ } catch (RuntimeException ex) {
+ if (firstException == null) {
+ firstException = ex;
+ }
+ }
+ }
+ if (firstException != null) {
+ throw firstException;
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ if (!_owner) {
+ return;
+ }
+ IOException firstException = null;
+
+ for (DataBuffer buffer : _buffers) {
+ try {
+ buffer.close();
+ } catch (IOException ex) {
+ if (firstException == null) {
+ firstException = ex;
+ }
+ }
+ }
+ if (firstException != null) {
+ throw firstException;
+ }
+ }
+
+ public DataBuffer[] getBuffers() {
+ return _buffers;
+ }
+
+ @Override
+ public void appendAsByteBuffers(List<ByteBuffer> appendTo) {
+ for (DataBuffer buffer : _buffers) {
+ buffer.appendAsByteBuffers(appendTo);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DataBuffer)) {
+ return false;
+ }
+ DataBuffer buffer = (DataBuffer) o;
+ return DataBuffer.sameContent(this, buffer);
+ }
+
+ @Override
+ public int hashCode() {
+ return DataBuffer.commonHashCode(this);
+ }
+
+ public static class Builder {
+ private final ByteOrder _order;
+ private final boolean _owner;
+ private final ArrayList<DataBuffer> _buffers = new ArrayList<>();
+
+ public Builder() {
+ this(ByteOrder.BIG_ENDIAN, false);
+ }
+
+ public Builder(ByteOrder order) {
+ this(order, false);
+ }
+
+ public Builder(boolean owner) {
+ this(ByteOrder.BIG_ENDIAN, owner);
+ }
+
+ public Builder(ByteOrder order, boolean owner) {
+ _order = order;
+ _owner = owner;
+ }
+
+ public Builder addBuffer(DataBuffer buffer) {
+ if (buffer instanceof CompoundDataBuffer) {
+ CompoundDataBuffer compoundBuffer = (CompoundDataBuffer) buffer;
+ for (DataBuffer childBuffer : compoundBuffer.getBuffers()) {
+ addBuffer(childBuffer);
+ }
+ return this;
+ }
+ if (buffer.size() != 0) {
+ _buffers.add(buffer);
+ }
+ return this;
+ }
+
+ public Builder addPagedOutputStream(PagedPinotOutputStream stream) {
+ ByteBuffer[] pages = stream.getPages();
+ for (ByteBuffer page : pages) {
+ addBuffer(PinotByteBuffer.wrap(page));
+ }
+ return this;
+ }
+
+ public CompoundDataBuffer build() {
+ return new CompoundDataBuffer(_buffers, _order, _owner);
+ }
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/DataBuffer.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/DataBuffer.java
new file mode 100644
index 0000000000..ec0d0c3261
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/DataBuffer.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import java.util.Objects;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+
+public interface DataBuffer extends Closeable {
+
+ default byte getByte(int offset) {
+ return getByte((long) offset);
+ }
+
+ byte getByte(long offset);
+
+ default void putByte(int offset, byte value) {
+ putByte((long) offset, value);
+ }
+
+ void putByte(long offset, byte value);
+
+ default char getChar(int offset) {
+ return getChar((long) offset);
+ }
+
+ char getChar(long offset);
+
+ default void putChar(int offset, char value) {
+ putChar((long) offset, value);
+ }
+
+ void putChar(long offset, char value);
+
+ default short getShort(int offset) {
+ return getShort((long) offset);
+ }
+
+ short getShort(long offset);
+
+ default void putShort(int offset, short value) {
+ putShort((long) offset, value);
+ }
+
+ void putShort(long offset, short value);
+
+ default int getInt(int offset) {
+ return getInt((long) offset);
+ }
+
+ int getInt(long offset);
+
+ default void putInt(int offset, int value) {
+ putInt((long) offset, value);
+ }
+
+ void putInt(long offset, int value);
+
+ default long getLong(int offset) {
+ return getLong((long) offset);
+ }
+
+ long getLong(long offset);
+
+ default void putLong(int offset, long value) {
+ putLong((long) offset, value);
+ }
+
+ void putLong(long offset, long value);
+
+ default float getFloat(int offset) {
+ return getFloat((long) offset);
+ }
+
+ float getFloat(long offset);
+
+ default void putFloat(int offset, float value) {
+ putFloat((long) offset, value);
+ }
+
+ void putFloat(long offset, float value);
+
+ default double getDouble(int offset) {
+ return getDouble((long) offset);
+ }
+
+ double getDouble(long offset);
+
+ default void putDouble(int offset, double value) {
+ putDouble((long) offset, value);
+ }
+
+ void putDouble(long offset, double value);
+
+ /**
+ * Given an array of bytes, copies the content of this object into the array
of bytes.
+ * The first byte to be copied is the one that could be read with {@code
this.getByte(offset)}
+ */
+ void copyTo(long offset, byte[] buffer, int destOffset, int size);
+
+ /**
+ * Given an array of bytes, copies the content of this object into the array
of bytes.
+ * The first byte to be copied is the one that could be read with {@code
this.getByte(offset)}
+ */
+ default void copyTo(long offset, byte[] buffer) {
+ copyTo(offset, buffer, 0, buffer.length);
+ }
+
+ /**
+ * Note: It is the responsibility of the caller to make sure arguments are
checked before the methods are called.
+ * While some rudimentary checks are performed on the input, the checks are
best effort and when performance is an
+ * overriding priority, as when methods of this class are optimized by the
runtime compiler, some or all checks
+ * (if any) may be elided. Hence, the caller must not rely on the checks and
corresponding exceptions!
+ */
+ void copyTo(long offset, DataBuffer buffer, long destOffset, long size);
+
+ default void copyTo(long offset, ByteBuffer buffer, int destOffset, int
size) {
+ PinotByteBuffer wrap = PinotByteBuffer.wrap(buffer);
+ copyTo(offset, wrap, destOffset, size);
+ }
+
+ /**
+ * Given an array of bytes, writes the content in the specified position.
+ */
+ void readFrom(long offset, byte[] buffer, int srcOffset, int size);
+
+ default void readFrom(long offset, byte[] buffer) {
+ readFrom(offset, buffer, 0, buffer.length);
+ }
+
+ void readFrom(long offset, ByteBuffer buffer);
+
+ void readFrom(long offset, File file, long srcOffset, long size)
+ throws IOException;
+
+ long size();
+
+ ByteOrder order();
+
+ /**
+ * Creates a view of the range [start, end) of this buffer with the given
byte order. Calling {@link #flush()} or
+ * {@link #close()} has no effect on view.
+ */
+ DataBuffer view(long start, long end, ByteOrder byteOrder);
+
+ /**
+ * Creates a view of the range [start, end) of this buffer with the current
byte order. Calling {@link #flush()} or
+ * {@link #close()} has no effect on view.
+ */
+ default DataBuffer view(long start, long end) {
+ return view(start, end, order());
+ }
+
+ void flush();
+
+ default PinotInputStream openInputStream() {
+ return new DataBufferPinotInputStream(this);
+ }
+
+ default PinotInputStream openInputStream(long offset) {
+ return openInputStream(offset, size() - offset);
+ }
+
+ default PinotInputStream openInputStream(long offset, long length) {
+ return new DataBufferPinotInputStream(this, offset, offset + length);
+ }
+
+ /**
+ * Reads the range [offset, offset + length) as a RoaringBitmap.
+ * <p>
+ * Implementations should do their best to do not allocate memory and
instead return a view of the underlying data.
+ */
+ ImmutableRoaringBitmap viewAsRoaringBitmap(long offset, int length);
+
+ /**
+ * Returns a ByteBuffer whose content is the same as the range [offset,
offset + size) of this buffer.
+ * <p>
+ * Implementations should do their best to do not allocate memory and
instead return a view of the underlying data.
+ * Callers cannot assume they have the ownership of the returned ByteBuffer,
and therefore should not call
+ * {@link CleanerUtil#cleanQuietly(ByteBuffer)} using the returned object.
+ */
+ ByteBuffer copyOrView(long offset, int size, ByteOrder byteOrder);
+
+ void appendAsByteBuffers(List<ByteBuffer> appendTo);
+
+ /**
+ *
+ * Returns a ByteBuffer whose content is the same as the range [offset,
offset + size) of this buffer.
+ * <p>
+ * Implementations should do their best to do not allocate memory and
instead return a view of the underlying data.
+ * Callers cannot assume they have the ownership of the returned ByteBuffer,
and therefore should not call
+ * {@link CleanerUtil#cleanQuietly(ByteBuffer)} using the returned object.
+ * <p>
+ * The returned ByteBuffer will have the same byte order as this buffer.
+ */
+ default ByteBuffer copyOrView(long offset, int size) {
+ return copyOrView(offset, size, order());
+ }
+
+ static boolean sameContent(DataBuffer buffer1, DataBuffer buffer2) {
+ long size = buffer1.size();
+ if (size != buffer2.size()) {
+ return false;
+ }
+ DataBuffer nativeBuffer1 = buffer1.view(0, size, ByteOrder.nativeOrder());
+ DataBuffer nativeBuffer2 = buffer2.view(0, size, ByteOrder.nativeOrder());
+
+ long maxLong = size & ~7L;
+ for (long i = 0; i < maxLong; i += 8) {
+ if (nativeBuffer1.getLong(i) != nativeBuffer2.getLong(i)) {
+ return false;
+ }
+ }
+ for (long i = maxLong; i < size; i++) {
+ if (buffer1.getByte(i) != buffer2.getByte(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ static int commonHashCode(DataBuffer dataBuffer) {
+ long firstLong;
+ long lastLong;
+ long size = dataBuffer.size();
+ int intSize;
+ if (size > Integer.MAX_VALUE) {
+ intSize = Integer.MAX_VALUE;
+ } else {
+ intSize = (int) size;
+ }
+ switch (intSize) {
+ case 0:
+ firstLong = 0;
+ lastLong = 0;
+ break;
+ case 1:
+ firstLong = dataBuffer.getByte(0);
+ lastLong = firstLong;
+ break;
+ case 2:
+ firstLong = dataBuffer.getShort(0);
+ lastLong = firstLong;
+ break;
+ case 3:
+ firstLong = dataBuffer.getShort(0);
+ lastLong = dataBuffer.getShort(1);
+ break;
+ case 4:
+ firstLong = dataBuffer.getInt(0);
+ lastLong = firstLong;
+ break;
+ case 5:
+ case 6:
+ case 7:
+ firstLong = dataBuffer.getInt(0);
+ lastLong = dataBuffer.getInt(intSize - 4);
+ break;
+ default:
+ firstLong = dataBuffer.getLong(0);
+ lastLong = dataBuffer.getLong(size - 8);
+ break;
+ }
+ return Objects.hash(size, firstLong, lastLong);
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/DataBufferPinotInputStream.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/DataBufferPinotInputStream.java
new file mode 100644
index 0000000000..e470b4863b
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/DataBufferPinotInputStream.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+
+/**
+ * An adaptor that allows a {@link DataBuffer} to be read as a {@link
PinotInputStream}.
+ */
+public class DataBufferPinotInputStream extends PinotInputStream {
+ private final DataBuffer _dataBuffer;
+ private long _currentOffset;
+
+ public DataBufferPinotInputStream(DataBuffer dataBuffer) {
+ this(dataBuffer, 0, dataBuffer.size());
+ }
+
+ public DataBufferPinotInputStream(DataBuffer dataBuffer, long startOffset,
long endOffset) {
+ _dataBuffer = dataBuffer.view(startOffset, endOffset,
ByteOrder.BIG_ENDIAN);
+ _currentOffset = 0;
+ }
+
+ @Override
+ public long getCurrentOffset() {
+ return _currentOffset;
+ }
+
+ @Override
+ public void seek(long newPos) {
+ if (newPos < 0 || newPos > _dataBuffer.size()) {
+ throw new IllegalArgumentException("Invalid new position: " + newPos);
+ }
+ _currentOffset = newPos;
+ }
+
+ @Override
+ public int read(ByteBuffer buf) {
+ int remaining = available();
+ if (remaining == 0) {
+ return -1;
+ }
+ PinotByteBuffer wrap = PinotByteBuffer.wrap(buf);
+ int toRead = Math.min(remaining, buf.remaining());
+ if (toRead > 0) {
+ _dataBuffer.copyTo(_currentOffset, wrap, 0, toRead);
+ _currentOffset += toRead;
+ }
+
+ return toRead;
+ }
+
+ @Override
+ public int read() {
+ if (_currentOffset >= _dataBuffer.size()) {
+ return -1;
+ } else {
+ return _dataBuffer.getByte(_currentOffset++) & 0xFF;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ",
b.length=" + b.length);
+ }
+ int available = available();
+ if (available == 0) {
+ return -1;
+ }
+ int result = Math.min(available, len);
+ _dataBuffer.copyTo(_currentOffset, b, off, result);
+ _currentOffset += result;
+ return result;
+ }
+
+ @Override
+ public long skip(long n) {
+ long increase = Math.min(n, availableLong());
+ _currentOffset += increase;
+ return increase;
+ }
+
+ public long availableLong() {
+ return _dataBuffer.size() - _currentOffset;
+ }
+
+ @Override
+ public int available() {
+ long available = _dataBuffer.size() - _currentOffset;
+ if (available > Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ } else {
+ return (int) available;
+ }
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len)
+ throws EOFException {
+ if (len < 0) {
+ throw new IndexOutOfBoundsException("len is negative: " + len);
+ }
+ if (off < 0 || off + len > b.length) {
+ throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ",
b.length=" + b.length);
+ }
+ // the javadoc of DataInput.readFully(byte[], int, int) says that the
method will block until the requested
+ // number of bytes has been read, end of file is detected, or an exception
is thrown.
+ // So being formal, we should modify the buffer even if we know we are
going to reach EOF.
+ boolean eof = availableLong() < len;
+ _dataBuffer.copyTo(_currentOffset, b, off, len);
+ _currentOffset += len;
+ if (eof) {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public boolean readBoolean()
+ throws EOFException {
+ if (_currentOffset >= _dataBuffer.size()) {
+ throw new EOFException();
+ }
+ return _dataBuffer.getByte(_currentOffset++) != 0;
+ }
+
+ @Override
+ public byte readByte()
+ throws EOFException {
+ if (availableLong() < 1) {
+ throw new EOFException();
+ }
+ return _dataBuffer.getByte(_currentOffset++);
+ }
+
+ @Override
+ public int readUnsignedByte()
+ throws EOFException {
+ if (availableLong() < 1) {
+ throw new EOFException();
+ }
+ return _dataBuffer.getByte(_currentOffset++) & 0xFF;
+ }
+
+ @Override
+ public short readShort()
+ throws EOFException {
+ if (availableLong() < 2) {
+ throw new EOFException();
+ }
+ short result = _dataBuffer.getShort(_currentOffset);
+ _currentOffset += 2;
+ return result;
+ }
+
+ @Override
+ public int readUnsignedShort()
+ throws EOFException {
+ return readShort() & 0xFFFF;
+ }
+
+ @Override
+ public char readChar()
+ throws EOFException {
+ return (char) readUnsignedShort();
+ }
+
+ @Override
+ public int readInt()
+ throws EOFException {
+ if (availableLong() < 4) {
+ throw new EOFException();
+ }
+ int result = _dataBuffer.getInt(_currentOffset);
+ _currentOffset += 4;
+ return result;
+ }
+
+ @Override
+ public long readLong()
+ throws EOFException {
+ if (availableLong() < 8) {
+ throw new EOFException();
+ }
+ long result = _dataBuffer.getLong(_currentOffset);
+ _currentOffset += 8;
+ return result;
+ }
+
+ @Override
+ public float readFloat()
+ throws EOFException {
+ if (availableLong() < 4) {
+ throw new EOFException();
+ }
+ float result = _dataBuffer.getFloat(_currentOffset);
+ _currentOffset += 4;
+ return result;
+ }
+
+ @Override
+ public double readDouble()
+ throws EOFException {
+ if (availableLong() < 8) {
+ throw new EOFException();
+ }
+ double result = _dataBuffer.getDouble(_currentOffset);
+ _currentOffset += 8;
+ return result;
+ }
+
+ @Deprecated
+ @Override
+ public String readLine()
+ throws IOException {
+ return new DataInputStream(this).readLine();
+ }
+
+ @Override
+ public String readUTF()
+ throws IOException {
+ return new DataInputStream(this).readUTF();
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/NonNativePinotDataBuffer.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/NonNativePinotDataBuffer.java
index 755345666f..3d29cb549e 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/NonNativePinotDataBuffer.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/NonNativePinotDataBuffer.java
@@ -205,7 +205,7 @@ public class NonNativePinotDataBuffer extends
PinotDataBuffer {
}
@Override
- public void copyTo(long offset, PinotDataBuffer buffer, long destOffset,
long size) {
+ public void copyTo(long offset, DataBuffer buffer, long destOffset, long
size) {
_nativeBuffer.copyTo(offset, buffer, destOffset, size);
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PagedPinotOutputStream.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PagedPinotOutputStream.java
new file mode 100644
index 0000000000..4153761e99
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PagedPinotOutputStream.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import org.apache.commons.lang3.ArchUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of {@link PinotOutputStream} that writes to a sequence of
pages.
+ * <p>
+ * When a page is full, a new page is allocated and writing continues there.
+ * The page size is determined by the {@link PageAllocator} used to create the
stream.
+ * <p>
+ * This class is specially useful when writing data whose final size is not
known in advance or when it is needed to
+ * {@link #seek(long) seek} to a specific position in the stream.
+ * Writes that cross page boundaries are handled transparently, although a
performance penalty will be paid.
+ * <p>
+ * Some {@link PageAllocator} implementations may support releasing pages,
which can be useful to reduce memory usage.
+ * The smaller the page size, the more pages will be allocated, but the
allocation is quite faster. This is specially
+ * important in the case of heap allocation, in which case allocations smaller
than TLAB will be faster.
+ * <p>
+ * Data written in this stream can be retrieved as a list of {@link
ByteBuffer} pages using {@link #getPages()}, which
+ * can be directly read using {@link CompoundDataBuffer} or send to the
network as using
+ * {@link com.google.protobuf.UnsafeByteOperations#unsafeWrap(byte[]) GRPC}.
+ */
+public class PagedPinotOutputStream extends PinotOutputStream {
+ private final PageAllocator _allocator;
+ private final int _pageSize;
+ private final ArrayList<ByteBuffer> _pages;
+ private ByteBuffer _currentPage;
+ private long _currentPageStartOffset;
+ private int _offsetInPage;
+ private long _written = 0;
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PagedPinotOutputStream.class);
+
+ public PagedPinotOutputStream(PageAllocator allocator) {
+ _pageSize = allocator.pageSize();
+ _allocator = allocator;
+ _pages = new ArrayList<>(8);
+ _currentPage = _allocator.allocate().order(ByteOrder.BIG_ENDIAN);
+ _currentPageStartOffset = 0;
+ _pages.add(_currentPage);
+ }
+
+ public static PagedPinotOutputStream createHeap() {
+ return new PagedPinotOutputStream(HeapPageAllocator.createSmall());
+ }
+
+ private void nextPage() {
+ moveCurrentOffset(remainingInPage());
+ }
+
+ private int remainingInPage() {
+ return _pageSize - _offsetInPage;
+ }
+
+ /**
+ * Returns a read only view of the pages written so far.
+ * <p>
+ * All pages but the last one will have its position set to 0 and the limit
to their capacity.
+ * The latest page will have its position set to 0 and its limit set to the
last byte written.
+ *
+ * TODO: Add one option that let caller choose start and end offset.
+ */
+ public ByteBuffer[] getPages() {
+ int numPages = _pages.size();
+
+ boolean lastPageIsEmpty = _written == (numPages - 1) * (long) _pageSize;
+ if (lastPageIsEmpty) {
+ numPages--;
+ }
+ if (numPages == 0) {
+ return new ByteBuffer[0];
+ }
+ ByteBuffer[] result = new ByteBuffer[numPages];
+
+ for (int i = 0; i < numPages; i++) {
+ ByteBuffer byteBuffer = _pages.get(i);
+ ByteBuffer page = byteBuffer.asReadOnlyBuffer();
+ page.clear();
+ result[i] = page;
+ }
+
+ if (!lastPageIsEmpty) {
+ long startOffset = getCurrentOffset();
+ seek(_written);
+ result[numPages - 1].limit(_offsetInPage);
+ seek(startOffset);
+ }
+
+ return result;
+ }
+
+ @Override
+ public long getCurrentOffset() {
+ return _currentPageStartOffset + _offsetInPage;
+ }
+
+ @Override
+ public void seek(long newPos) {
+ if (newPos < 0) {
+ throw new IllegalArgumentException("New position cannot be negative");
+ }
+ if (newPos == 0) {
+ _currentPage = _pages.get(0);
+ _offsetInPage = 0;
+ } else {
+ int pageIdx = (int) (newPos / _pageSize);
+ if (pageIdx >= _pages.size()) {
+ _pages.ensureCapacity(pageIdx + 1);
+ while (_pages.size() <= pageIdx) {
+ _pages.add(_allocator.allocate().order(ByteOrder.BIG_ENDIAN));
+ }
+ }
+ int offsetInPage = (int) (newPos % _pageSize);
+ _currentPage = _pages.get(pageIdx);
+ _currentPageStartOffset = pageIdx * (long) _pageSize;
+ _offsetInPage = offsetInPage;
+ }
+ _written = Math.max(_written, newPos);
+ }
+
+ @Override
+ public void write(int b)
+ throws IOException {
+ if (remainingInPage() == 0) {
+ nextPage();
+ }
+ _currentPage.put(_offsetInPage++, (byte) b);
+ _written = Math.max(_written, _offsetInPage + _currentPageStartOffset);
+ }
+
+ @Override
+ public void writeInt(int v)
+ throws IOException {
+ if (remainingInPage() >= Integer.BYTES) {
+ _currentPage.putInt(_offsetInPage, v);
+ _offsetInPage += Integer.BYTES;
+ _written = Math.max(_written, _offsetInPage + _currentPageStartOffset);
+ } else {
+ super.writeInt(v);
+ }
+ }
+
+ @Override
+ public void writeLong(long v)
+ throws IOException {
+ if (remainingInPage() >= Long.BYTES) {
+ _currentPage.putLong(_offsetInPage, v);
+ _offsetInPage += Long.BYTES;
+ _written = Math.max(_written, _offsetInPage + _currentPageStartOffset);
+ } else {
+ super.writeLong(v);
+ }
+ }
+
+ @Override
+ public void writeShort(int v)
+ throws IOException {
+ if (remainingInPage() >= Short.BYTES) {
+ _currentPage.putShort(_offsetInPage, (short) v);
+ _offsetInPage += Short.BYTES;
+ _written = Math.max(_written, _offsetInPage + _currentPageStartOffset);
+ } else {
+ super.writeShort(v);
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len)
+ throws IOException {
+ if (remainingInPage() >= len) {
+ _currentPage.position(_offsetInPage);
+ _currentPage.put(b, off, len);
+ _offsetInPage += len;
+ _currentPage.position(0);
+ } else {
+ int written = 0;
+ while (written < len) {
+ int remainingInPage = remainingInPage();
+ if (remainingInPage == 0) {
+ nextPage();
+ continue;
+ }
+ int toWrite = Math.min(len - written, remainingInPage);
+ _currentPage.position(_offsetInPage);
+ _currentPage.put(b, off + written, toWrite);
+ _currentPage.position(0);
+ written += toWrite;
+ _offsetInPage += toWrite;
+ }
+ }
+ _written = Math.max(_written, _offsetInPage + _currentPageStartOffset);
+ }
+
+ @Override
+ public void write(DataBuffer input, long offset, long length)
+ throws IOException {
+ if (remainingInPage() >= length) {
+ int intLength = (int) length;
+ input.copyTo(offset, _currentPage, _offsetInPage, intLength);
+ _offsetInPage += intLength;
+ } else {
+ long written = 0;
+ while (written < length) {
+ int remainingInPage = remainingInPage();
+ if (remainingInPage == 0) {
+ nextPage();
+ continue;
+ }
+ int toWrite = (int) Math.min(length - written, remainingInPage);
+ input.copyTo(offset + written, _currentPage, _offsetInPage, toWrite);
+ written += toWrite;
+ _offsetInPage += toWrite;
+ }
+ }
+ _written = Math.max(_written, _offsetInPage + _currentPageStartOffset);
+ }
+
+ /**
+ * Returns a view of the data written so far as a {@link DataBuffer}.
+ * <p>
+ * The returned DataBuffer will contain all the data being written. This is
specially important when
+ * {@link #getCurrentOffset()} has been moved back from the latest written
position.
+ *
+ * TODO: Add one option that let caller choose start and end offset.
+ */
+ public DataBuffer asBuffer(ByteOrder order, boolean owner) {
+ if (_written == 0) {
+ return PinotDataBuffer.empty();
+ }
+
+ // TODO: We can remove this check
+ ByteBuffer[] pages = getPages();
+ for (int i = 0; i < pages.length; i++) {
+ ByteBuffer page = pages[i];
+ if (page.remaining() != _pageSize && (i != pages.length - 1 ||
!page.hasRemaining())) {
+ throw new IllegalArgumentException("Unexpected remaining bytes in page
" + i + ": " + page.remaining());
+ }
+ }
+
+ return new CompoundDataBuffer(pages, order, owner);
+ }
+
+ public int getPageSize() {
+ return _pageSize;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ IOException ex = null;
+ for (ByteBuffer page : _pages) {
+ try {
+ _allocator.release(page);
+ } catch (IOException e) {
+ if (ex == null) {
+ ex = e;
+ } else {
+ ex.addSuppressed(e);
+ }
+ }
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+ public static abstract class PageAllocator {
+ public static final int MIN_RECOMMENDED_PAGE_SIZE;
+ public static final int MAX_RECOMMENDED_PAGE_SIZE;
+
+ static {
+ int minRecommendedPageSize = -1;
+ int maxRecommendedPageSize = -1;
+ try {
+ switch (ArchUtils.getProcessor().getType()) {
+ case AARCH_64:
+ // ARM processors support 4KB and 1MB pages
+ minRecommendedPageSize = 16 * 1024;
+ maxRecommendedPageSize = 1024 * 1024;
+ break;
+ case X86:
+ default:
+ // X86 processors support 4KB and 4MB pages
+ minRecommendedPageSize = 4 * 1024;
+ maxRecommendedPageSize = 4 * 1024 * 1024;
+ break;
+ }
+ } catch (Throwable t) {
+ LOGGER.warn("Could not determine processor architecture. Falling back
to default values", t);
+ // Fallback to 4KB and 4MBs
+ minRecommendedPageSize = 4 * 1024;
+ maxRecommendedPageSize = 4 * 1024 * 1024;
+ }
+ MIN_RECOMMENDED_PAGE_SIZE = minRecommendedPageSize;
+ MAX_RECOMMENDED_PAGE_SIZE = maxRecommendedPageSize;
+ }
+
+ public abstract int pageSize();
+
+ public abstract ByteBuffer allocate();
+
+ public abstract void release(ByteBuffer buffer)
+ throws IOException;
+ }
+
+ public static class HeapPageAllocator extends PageAllocator {
+
+ private final int _pageSize;
+
+ public static HeapPageAllocator createSmall() {
+ return new HeapPageAllocator(MIN_RECOMMENDED_PAGE_SIZE);
+ }
+
+ public static HeapPageAllocator createLarge() {
+ return new HeapPageAllocator(MAX_RECOMMENDED_PAGE_SIZE);
+ }
+
+ public HeapPageAllocator(int pageSize) {
+ Preconditions.checkArgument(pageSize > 0, "Page size must be positive");
+ _pageSize = pageSize;
+ }
+
+ @Override
+ public int pageSize() {
+ return _pageSize;
+ }
+
+ @Override
+ public ByteBuffer allocate() {
+ return ByteBuffer.allocate(_pageSize);
+ }
+
+ @Override
+ public void release(ByteBuffer buffer) {
+ // Do nothing
+ }
+ }
+
+ public static class DirectPageAllocator extends PageAllocator {
+
+ private final int _pageSize;
+ private final boolean _release;
+
+ public DirectPageAllocator(int pageSize) {
+ this(pageSize, false);
+ }
+
+ public DirectPageAllocator(int pageSize, boolean release) {
+ Preconditions.checkArgument(pageSize > 0, "Page size must be positive");
+ _pageSize = pageSize;
+ _release = release;
+ }
+
+ public static DirectPageAllocator createSmall(boolean release) {
+ return new DirectPageAllocator(MIN_RECOMMENDED_PAGE_SIZE, release);
+ }
+
+ public static DirectPageAllocator createLarge(boolean release) {
+ return new DirectPageAllocator(MAX_RECOMMENDED_PAGE_SIZE, release);
+ }
+
+ @Override
+ public int pageSize() {
+ return _pageSize;
+ }
+
+ @Override
+ public ByteBuffer allocate() {
+ return ByteBuffer.allocateDirect(_pageSize);
+ }
+
+ @Override
+ public void release(ByteBuffer buffer)
+ throws IOException {
+ if (_release && CleanerUtil.UNMAP_SUPPORTED) {
+ CleanerUtil.getCleaner().freeBuffer(buffer);
+ }
+ }
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotByteBuffer.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotByteBuffer.java
index 1cb357ea44..c3fc91f28b 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotByteBuffer.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotByteBuffer.java
@@ -31,6 +31,8 @@ import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
public class PinotByteBuffer extends PinotDataBuffer {
+ public static final PinotDataBuffer EMPTY =
PinotByteBuffer.wrap(ByteBuffer.allocate(0));
+
private final ByteBuffer _buffer;
private final boolean _flushable;
@@ -60,6 +62,14 @@ public class PinotByteBuffer extends PinotDataBuffer {
}
}
+ public static PinotByteBuffer wrap(byte[] buffer) {
+ return new PinotByteBuffer(ByteBuffer.wrap(buffer), false, false);
+ }
+
+ public static PinotByteBuffer wrap(ByteBuffer buffer) {
+ return new PinotByteBuffer(buffer, false, false);
+ }
+
private PinotByteBuffer(ByteBuffer buffer, boolean closeable, boolean
flushable) {
super(closeable);
_buffer = buffer;
@@ -237,7 +247,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
}
@Override
- public void copyTo(long offset, PinotDataBuffer buffer, long destOffset,
long size) {
+ public void copyTo(long offset, DataBuffer buffer, long destOffset, long
size) {
assert offset <= Integer.MAX_VALUE;
assert size <= Integer.MAX_VALUE;
int start = (int) offset;
@@ -282,7 +292,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
ByteBuffer duplicate = _buffer.duplicate();
int start = (int) offset;
int end = start + (int) size;
- ((Buffer) duplicate).position(start).limit(end);
+ duplicate.position(start).limit(end);
channel.read(duplicate, srcOffset);
}
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java
index 425f7e6b84..2ed9d22e92 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java
@@ -19,7 +19,7 @@
package org.apache.pinot.segment.spi.memory;
import com.google.common.annotations.VisibleForTesting;
-import java.io.Closeable;
+import com.google.common.collect.MapMaker;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -29,7 +29,6 @@ import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -37,6 +36,7 @@ import
org.apache.pinot.segment.spi.memory.unsafe.UnsafePinotBufferFactory;
import org.apache.pinot.segment.spi.utils.JavaVersion;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.plugin.PluginManager;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
* </ul>
*/
@ThreadSafe
-public abstract class PinotDataBuffer implements Closeable {
+public abstract class PinotDataBuffer implements DataBuffer {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotDataBuffer.class);
public static final ByteOrder NATIVE_ORDER = ByteOrder.nativeOrder();
@@ -71,8 +71,8 @@ public abstract class PinotDataBuffer implements Closeable {
private static final boolean DEFAULT_PRIORITIZE_BYTE_BUFFER;
static {
- String skipBbEnvValue = System.getenv(SKIP_BYTEBUFFER_ENV);
- DEFAULT_PRIORITIZE_BYTE_BUFFER = !Boolean.parseBoolean(skipBbEnvValue);
+ String skipBbEnvValue = System.getenv(SKIP_BYTEBUFFER_ENV);
+ DEFAULT_PRIORITIZE_BYTE_BUFFER = !Boolean.parseBoolean(skipBbEnvValue);
}
private static class BufferContext {
@@ -110,7 +110,8 @@ public abstract class PinotDataBuffer implements Closeable {
private static final AtomicLong MMAP_BUFFER_COUNT = new AtomicLong();
private static final AtomicLong MMAP_BUFFER_USAGE = new AtomicLong();
private static final AtomicLong ALLOCATION_FAILURE_COUNT = new AtomicLong();
- private static final Map<PinotDataBuffer, BufferContext> BUFFER_CONTEXT_MAP
= new WeakHashMap<>();
+ // we need to use MapMaker instead of WeakHashMap because we want to use
identity comparison for the keys
+ private static final Map<PinotDataBuffer, BufferContext> BUFFER_CONTEXT_MAP
= new MapMaker().weakKeys().makeMap();
/**
* Configuration key used to change the offheap buffer factory used by Pinot.
@@ -233,9 +234,7 @@ public abstract class PinotDataBuffer implements Closeable {
}
DIRECT_BUFFER_COUNT.getAndIncrement();
DIRECT_BUFFER_USAGE.getAndAdd(size);
- synchronized (BUFFER_CONTEXT_MAP) {
- BUFFER_CONTEXT_MAP.put(buffer, new
BufferContext(BufferContext.Type.DIRECT, size, null, description));
- }
+ BUFFER_CONTEXT_MAP.put(buffer, new
BufferContext(BufferContext.Type.DIRECT, size, null, description));
return buffer;
}
@@ -257,10 +256,8 @@ public abstract class PinotDataBuffer implements Closeable
{
}
DIRECT_BUFFER_COUNT.getAndIncrement();
DIRECT_BUFFER_USAGE.getAndAdd(size);
- synchronized (BUFFER_CONTEXT_MAP) {
- BUFFER_CONTEXT_MAP.put(buffer,
- new BufferContext(BufferContext.Type.DIRECT, size,
file.getAbsolutePath().intern(), description));
- }
+ BUFFER_CONTEXT_MAP.put(buffer,
+ new BufferContext(BufferContext.Type.DIRECT, size,
file.getAbsolutePath().intern(), description));
return buffer;
}
@@ -292,10 +289,8 @@ public abstract class PinotDataBuffer implements Closeable
{
}
MMAP_BUFFER_COUNT.getAndIncrement();
MMAP_BUFFER_USAGE.getAndAdd(size);
- synchronized (BUFFER_CONTEXT_MAP) {
- BUFFER_CONTEXT_MAP
- .put(buffer, new BufferContext(BufferContext.Type.MMAP, size,
file.getAbsolutePath().intern(), description));
- }
+ BUFFER_CONTEXT_MAP.put(buffer,
+ new BufferContext(BufferContext.Type.MMAP, size,
file.getAbsolutePath().intern(), description));
return buffer;
}
@@ -338,19 +333,20 @@ public abstract class PinotDataBuffer implements
Closeable {
}
public static List<String> getBufferInfo() {
- synchronized (BUFFER_CONTEXT_MAP) {
- List<String> bufferInfo = new ArrayList<>(BUFFER_CONTEXT_MAP.size());
- for (BufferContext bufferContext : BUFFER_CONTEXT_MAP.values()) {
- bufferInfo.add(bufferContext.toString());
- }
- return bufferInfo;
+ List<String> bufferInfo = new ArrayList<>(BUFFER_CONTEXT_MAP.size());
+ for (BufferContext bufferContext : BUFFER_CONTEXT_MAP.values()) {
+ bufferInfo.add(bufferContext.toString());
}
+ return bufferInfo;
}
private static String getBufferStats() {
- return String
- .format("Direct buffer count: %s, size: %s; Mmap buffer count: %s,
size: %s", DIRECT_BUFFER_COUNT.get(),
- DIRECT_BUFFER_USAGE.get(), MMAP_BUFFER_COUNT.get(),
MMAP_BUFFER_USAGE.get());
+ return String.format("Direct buffer count: %s, size: %s; Mmap buffer
count: %s, size: %s",
+ DIRECT_BUFFER_COUNT.get(), DIRECT_BUFFER_USAGE.get(),
MMAP_BUFFER_COUNT.get(), MMAP_BUFFER_USAGE.get());
+ }
+
+ public static PinotDataBuffer empty() {
+ return PinotByteBuffer.EMPTY;
}
private volatile boolean _closeable;
@@ -363,12 +359,8 @@ public abstract class PinotDataBuffer implements Closeable
{
public synchronized void close()
throws IOException {
if (_closeable) {
- flush();
- release();
BufferContext bufferContext;
- synchronized (BUFFER_CONTEXT_MAP) {
- bufferContext = BUFFER_CONTEXT_MAP.remove(this);
- }
+ bufferContext = BUFFER_CONTEXT_MAP.remove(this);
if (bufferContext != null) {
if (bufferContext._type == BufferContext.Type.DIRECT) {
DIRECT_BUFFER_COUNT.getAndDecrement();
@@ -378,98 +370,129 @@ public abstract class PinotDataBuffer implements
Closeable {
MMAP_BUFFER_USAGE.getAndAdd(-bufferContext._size);
}
}
+ flush();
+ release();
_closeable = false;
}
}
+ @Override
public byte getByte(int offset) {
return getByte((long) offset);
}
+ @Override
public abstract byte getByte(long offset);
+ @Override
public void putByte(int offset, byte value) {
putByte((long) offset, value);
}
+ @Override
public abstract void putByte(long offset, byte value);
+ @Override
public char getChar(int offset) {
return getChar((long) offset);
}
+ @Override
public abstract char getChar(long offset);
+ @Override
public void putChar(int offset, char value) {
putChar((long) offset, value);
}
+ @Override
public abstract void putChar(long offset, char value);
+ @Override
public short getShort(int offset) {
return getShort((long) offset);
}
+ @Override
public abstract short getShort(long offset);
+ @Override
public void putShort(int offset, short value) {
putShort((long) offset, value);
}
+ @Override
public abstract void putShort(long offset, short value);
+ @Override
public int getInt(int offset) {
return getInt((long) offset);
}
+ @Override
public abstract int getInt(long offset);
+ @Override
public void putInt(int offset, int value) {
putInt((long) offset, value);
}
+ @Override
public abstract void putInt(long offset, int value);
+ @Override
public long getLong(int offset) {
return getLong((long) offset);
}
+ @Override
public abstract long getLong(long offset);
+ @Override
public void putLong(int offset, long value) {
putLong((long) offset, value);
}
+ @Override
public abstract void putLong(long offset, long value);
+ @Override
public float getFloat(int offset) {
return getFloat((long) offset);
}
+ @Override
public abstract float getFloat(long offset);
+ @Override
public void putFloat(int offset, float value) {
putFloat((long) offset, value);
}
+ @Override
public abstract void putFloat(long offset, float value);
+ @Override
public double getDouble(int offset) {
return getDouble((long) offset);
}
+ @Override
public abstract double getDouble(long offset);
+ @Override
public void putDouble(int offset, double value) {
putDouble((long) offset, value);
}
+ @Override
public abstract void putDouble(long offset, double value);
/**
* Given an array of bytes, copies the content of this object into the array
of bytes.
* The first byte to be copied is the one that could be read with {@code
this.getByte(offset)}
*/
+ @Override
public void copyTo(long offset, byte[] buffer, int destOffset, int size) {
if (size <= BULK_BYTES_PROCESSING_THRESHOLD) {
int end = destOffset + size;
@@ -485,6 +508,7 @@ public abstract class PinotDataBuffer implements Closeable {
* Given an array of bytes, copies the content of this object into the array
of bytes.
* The first byte to be copied is the one that could be read with {@code
this.getByte(offset)}
*/
+ @Override
public void copyTo(long offset, byte[] buffer) {
copyTo(offset, buffer, 0, buffer.length);
}
@@ -495,31 +519,51 @@ public abstract class PinotDataBuffer implements
Closeable {
* overriding priority, as when methods of this class are optimized by the
runtime compiler, some or all checks
* (if any) may be elided. Hence, the caller must not rely on the checks and
corresponding exceptions!
*/
- public void copyTo(long offset, PinotDataBuffer buffer, long destOffset,
long size) {
- int pageSize = Integer.MAX_VALUE;
- long alreadyCopied = 0;
+ @Override
+ public void copyTo(long offset, DataBuffer buffer, long destOffset, long
size) {
+ if (buffer instanceof PinotDataBuffer) {
+ int pageSize = Integer.MAX_VALUE;
+ long alreadyCopied = 0;
- while (size - alreadyCopied > 0L) {
- int step;
- long remaining = size - alreadyCopied;
+ while (size - alreadyCopied > 0L) {
+ int step;
+ long remaining = size - alreadyCopied;
- if (remaining > pageSize) {
- step = pageSize;
- } else {
- step = (int) remaining;
- }
- ByteBuffer destBb = buffer.toDirectByteBuffer(destOffset +
alreadyCopied, step);
- ByteBuffer myView = toDirectByteBuffer(offset + alreadyCopied, step);
+ if (remaining > pageSize) {
+ step = pageSize;
+ } else {
+ step = (int) remaining;
+ }
+ ByteBuffer destBb = ((PinotDataBuffer)
buffer).toDirectByteBuffer(destOffset + alreadyCopied, step);
+ ByteBuffer myView = toDirectByteBuffer(offset + alreadyCopied, step);
- destBb.put(myView);
+ destBb.put(myView);
- alreadyCopied += step;
+ alreadyCopied += step;
+ }
+ } else {
+ byte[] temp = new byte[BULK_BYTES_PROCESSING_THRESHOLD];
+ long alreadyCopied = 0;
+ while (size - alreadyCopied > 0L) {
+ int step;
+ long remaining = size - alreadyCopied;
+
+ if (remaining > BULK_BYTES_PROCESSING_THRESHOLD) {
+ step = BULK_BYTES_PROCESSING_THRESHOLD;
+ } else {
+ step = (int) remaining;
+ }
+ copyTo(offset + alreadyCopied, temp, 0, step);
+ buffer.readFrom(destOffset + alreadyCopied, temp, 0, step);
+ alreadyCopied += step;
+ }
}
}
/**
* Given an array of bytes, writes the content in the specified position.
*/
+ @Override
public void readFrom(long offset, byte[] buffer, int srcOffset, int size) {
if (offset + size > size()) {
throw new IndexOutOfBoundsException("Buffer overflow: offset = " +
offset + ", size = " + size
@@ -536,19 +580,20 @@ public abstract class PinotDataBuffer implements
Closeable {
}
}
+ @Override
public void readFrom(long offset, byte[] buffer) {
readFrom(offset, buffer, 0, buffer.length);
}
+ @Override
public void readFrom(long offset, ByteBuffer buffer) {
toDirectByteBuffer(offset, buffer.remaining()).put(buffer);
}
+ @Override
public void readFrom(long offset, File file, long srcOffset, long size)
throws IOException {
- try (
- RandomAccessFile raf = new RandomAccessFile(file, "r");
- FileChannel fileChannel = raf.getChannel()) {
+ try (RandomAccessFile raf = new RandomAccessFile(file, "r"); FileChannel
fileChannel = raf.getChannel()) {
int step = Integer.MAX_VALUE / 2;
while (size > Integer.MAX_VALUE) {
ByteBuffer bb = toDirectByteBuffer(offset, step);
@@ -562,24 +607,34 @@ public abstract class PinotDataBuffer implements
Closeable {
}
}
+ @Override
public abstract long size();
+ @Override
public abstract ByteOrder order();
/**
* Creates a view of the range [start, end) of this buffer with the given
byte order. Calling {@link #flush()} or
* {@link #close()} has no effect on view.
*/
+ @Override
public abstract PinotDataBuffer view(long start, long end, ByteOrder
byteOrder);
/**
* Creates a view of the range [start, end) of this buffer with the current
byte order. Calling {@link #flush()} or
* {@link #close()} has no effect on view.
*/
+ @Override
public PinotDataBuffer view(long start, long end) {
return view(start, end, order());
}
+ @Override
+ public ImmutableRoaringBitmap viewAsRoaringBitmap(long offset, int length) {
+ ByteBuffer bb = toDirectByteBuffer(offset, length,
ByteOrder.LITTLE_ENDIAN);
+ return new ImmutableRoaringBitmap(bb);
+ }
+
/**
* Returns an ByteBuffer with the same content of this buffer.
*
@@ -616,9 +671,6 @@ public abstract class PinotDataBuffer implements Closeable {
* <li>A write made by either the receiver or the returned ByteBuffer will
be seen by the other.</li>
* </ol>
*
- * Depending on the implementation, this may be a view (and therefore
changes on any buffer will be seen by the other)
- * or a copy (in which case the cost will be higher, but each copy will have
their own lifecycle).
- *
*/
// TODO: Most calls to this method are just used to then read the content of
the buffer.
// This is unnecessary an generates 2-5 unnecessary objects. We should
benchmark whether there is some advantage on
@@ -627,6 +679,23 @@ public abstract class PinotDataBuffer implements Closeable
{
return toDirectByteBuffer(offset, size, order());
}
+ @Override
+ public ByteBuffer copyOrView(long offset, int size, ByteOrder byteOrder) {
+ return toDirectByteBuffer(offset, size, byteOrder);
+ }
+
+ @Override
+ public void appendAsByteBuffers(List<ByteBuffer> appendTo) {
+ long size = size();
+ long offset = 0;
+ while (size - offset > 0) {
+ int byteBufferSize = (int) Math.min(size - offset, Integer.MAX_VALUE);
+ appendTo.add(copyOrView(offset, byteBufferSize));
+ offset += size;
+ }
+ }
+
+ @Override
public abstract void flush();
public abstract void release()
@@ -644,8 +713,25 @@ public abstract class PinotDataBuffer implements Closeable
{
throw new IllegalArgumentException("Size " + size + " cannot be
negative");
}
if (offset + size > capacity) {
- throw new IllegalArgumentException("Size (" + size + ") + offset (" +
offset + ") exceeds the capacity of "
- + capacity);
+ throw new IllegalArgumentException(
+ "Size (" + size + ") + offset (" + offset + ") exceeds the capacity
of " + capacity);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DataBuffer)) {
+ return false;
}
+ DataBuffer buffer = (DataBuffer) o;
+ return DataBuffer.sameContent(this, buffer);
+ }
+
+ @Override
+ public int hashCode() {
+ return DataBuffer.commonHashCode(this);
}
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotInputStream.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotInputStream.java
new file mode 100644
index 0000000000..a7b8044b33
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotInputStream.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import org.apache.commons.lang3.StringUtils;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+/**
+ * A {@link DataInput} that also supports to move the cursor with {@link
#seek(long)} and some other features
+ * used in Apache Pinot.
+ * <p>
+ * This class is based on Parquet's SeekableInputStream.
+ */
+public abstract class PinotInputStream extends InputStream implements
DataInput {
+
+ /**
+ * Return the current position in the InputStream.
+ *
+ * @return current position in bytes from the start of the stream
+ */
+ public abstract long getCurrentOffset();
+
+ /**
+ * Seek to a new position in the InputStream.
+ *
+ * @param newPos the new position to seek to
+ * @throws IllegalArgumentException If the new position is negative or
greater than the length of the stream
+ */
+ public abstract void seek(long newPos);
+
+ /**
+ * Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}.
+ * <p>
+ * This method will copy available bytes into the buffer, reading at most
+ * {@code buf.remaining()} bytes. The number of bytes actually copied is
+ * returned by the method, or -1 is returned to signal that the end of the
+ * underlying stream has been reached.
+ *
+ * @param buf a byte buffer to fill with data from the stream
+ * @return the number of bytes read or -1 if the stream ended. It may be 0
if there are no bytes available in the
+ * stream
+ * @throws IOException If the underlying stream throws IOException
+ */
+ public abstract int read(ByteBuffer buf) throws IOException;
+
+ public String readInt4UTF()
+ throws IOException {
+ int length = readInt();
+ if (length == 0) {
+ return StringUtils.EMPTY;
+ } else {
+ byte[] bytes = new byte[length];
+ readFully(bytes);
+ return new String(bytes, UTF_8);
+ }
+ }
+
+ public abstract long availableLong();
+
+ @Override
+ public int skipBytes(int n) {
+ if (n <= 0) {
+ return 0;
+ }
+ int step = Math.min(available(), n);
+ seek(getCurrentOffset() + step);
+ return step;
+ }
+
+ @Override
+ public int available() {
+ long available = availableLong();
+ if (available > Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ } else {
+ return (int) available;
+ }
+ }
+
+ @Override
+ public void readFully(byte[] b)
+ throws IOException {
+ readFully(b, 0, b.length);
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotOutputStream.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotOutputStream.java
new file mode 100644
index 0000000000..db8a24313c
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotOutputStream.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import com.google.common.primitives.Longs;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+
+public abstract class PinotOutputStream extends OutputStream implements
DataOutput {
+
+ /**
+ * Return the current position in the OutputStream.
+ *
+ * @return current position in bytes from the start of the stream
+ */
+ public abstract long getCurrentOffset();
+
+ /**
+ * Seek to a new position in the OutputStream.
+ *
+ * @param newPos the new position to seek to
+ * @throws IllegalArgumentException If the new position is negative
+ */
+ public abstract void seek(long newPos);
+
+ /**
+ * Moves the current offset, applying the given change.
+ * @param change the change to apply to the current offset
+ * @throws IllegalArgumentException if the new position is negative
+ */
+ public void moveCurrentOffset(long change) {
+ long newOffset = getCurrentOffset() + change;
+ seek(newOffset);
+ }
+
+ @Override
+ public void writeBoolean(boolean v) throws IOException {
+ write(v ? 1 : 0);
+ }
+
+ @Override
+ public void writeByte(int v) throws IOException {
+ write(v);
+ }
+
+ @Override
+ public void writeChar(int v) throws IOException {
+ writeShort(v);
+ }
+
+ @Override
+ public void writeChars(String s) throws IOException {
+ for (int i = 0; i < s.length(); i++) {
+ writeChar(s.charAt(i));
+ }
+ }
+
+ public void writeDouble(double v) throws IOException {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ public void writeFloat(float v) throws IOException {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ public void writeInt(int v) throws IOException {
+ write(0xFF & (v >> 24));
+ write(0xFF & (v >> 16));
+ write(0xFF & (v >> 8));
+ write(0xFF & v);
+ }
+
+ public void writeLong(long v) throws IOException {
+ byte[] bytes = Longs.toByteArray(v);
+ write(bytes, 0, bytes.length);
+ }
+
+ public void writeShort(int v) throws IOException {
+ write(0xFF & (v >> 8));
+ write(0xFF & v);
+ }
+
+ /**
+ * This method is commonly used in Pinot to write a string with a 4-byte
length prefix.
+ * <p>
+ * <b>Note</b>: This method is incompatible with {@link
DataOutput#writeUTF(String)}. It is recommended to use
+ * this method instead of the one in {@link DataOutput} to write strings.
+ */
+ public void writeInt4String(String v) throws IOException {
+ writeInt(v.length());
+ write(v.getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public void writeBytes(String s)
+ throws IOException {
+ new DataOutputStream(this).writeBytes(s);
+ }
+
+ @Override
+ public void writeUTF(String s)
+ throws IOException {
+ new DataOutputStream(this).writeUTF(s);
+ }
+
+ public void write(DataBuffer input)
+ throws IOException {
+ write(input, 0, input.size());
+ }
+
+ public void write(DataBuffer input, long offset, long length)
+ throws IOException {
+ byte[] bytes = new byte[4096];
+ long currentOffset = offset;
+ while (currentOffset < offset + length) {
+ int bytesToRead = (int) Math.min(length - (currentOffset - offset),
bytes.length);
+ input.copyTo(currentOffset, bytes, 0, bytesToRead);
+ write(bytes, 0, bytesToRead);
+ currentOffset += bytesToRead;
+ }
+ }
+}
diff --git
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/CompoundDataBufferTest.java
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/CompoundDataBufferTest.java
new file mode 100644
index 0000000000..23a37d6fec
--- /dev/null
+++
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/CompoundDataBufferTest.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Random;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class CompoundDataBufferTest {
+
+ private CompoundDataBuffer _compoundDataBuffer;
+ private static final int BUFFER_SIZE = 32;
+
+ @BeforeMethod
+ public void setUp() {
+ _compoundDataBuffer = new CompoundDataBuffer(new DataBuffer[]{
+ PinotByteBuffer.wrap(new byte[BUFFER_SIZE]),
+ PinotByteBuffer.wrap(new byte[BUFFER_SIZE]),
+ PinotByteBuffer.wrap(new byte[BUFFER_SIZE])},
+ ByteOrder.BIG_ENDIAN,
+ true
+ );
+ }
+
+ @DataProvider
+ Object[][] thingsToWrite() {
+ Random r = new Random(42);
+ byte[] bytes = new byte[BUFFER_SIZE];
+ r.nextBytes(bytes);
+ return new Object[][] {
+ {"int", r.nextInt()},
+ {"float", r.nextFloat()},
+ {"long", r.nextLong()},
+ {"double", r.nextDouble()},
+ {"bytes", bytes},
+ {"char", 'a'},
+ {"short", (short) r.nextInt()},
+ {"byte", (byte) r.nextInt()}
+ };
+ }
+
+ private int positionToWriteInSingleBuffer(int writeSize) {
+ if (writeSize > BUFFER_SIZE) {
+ throw new IllegalArgumentException("Write size " + writeSize + " is
greater than buffer size " + BUFFER_SIZE);
+ }
+ return 0;
+ }
+
+ private int positionToWriteTwoBuffer(int writeSize) {
+ if (writeSize > 2 * BUFFER_SIZE) {
+ throw new IllegalArgumentException("Write size " + writeSize + " is
greater than " + (2 * BUFFER_SIZE));
+ }
+ return BUFFER_SIZE - writeSize + 1;
+ }
+
+ @Test(dataProvider = "thingsToWrite")
+ public void writeOnSingleBuffer(String dataType, Object value) {
+ int position;
+ switch (dataType) {
+ case "int":
+ position = positionToWriteInSingleBuffer(Integer.BYTES);
+ _compoundDataBuffer.putInt(position, (int) value);
+ assertEquals(_compoundDataBuffer.getInt(position), value);
+ break;
+ case "float":
+ position = positionToWriteInSingleBuffer(Float.BYTES);
+ _compoundDataBuffer.putFloat(position, (float) value);
+ assertEquals(_compoundDataBuffer.getFloat(position), value);
+ break;
+ case "long":
+ position = positionToWriteInSingleBuffer(Long.BYTES);
+ _compoundDataBuffer.putLong(position, (long) value);
+ assertEquals(_compoundDataBuffer.getLong(position), value);
+ break;
+ case "double":
+ position = positionToWriteInSingleBuffer(Double.BYTES);
+ _compoundDataBuffer.putDouble(position, (double) value);
+ assertEquals(_compoundDataBuffer.getDouble(position), value);
+ break;
+ case "char":
+ position = positionToWriteInSingleBuffer(Character.BYTES);
+ _compoundDataBuffer.putChar(position, (char) value);
+ assertEquals(_compoundDataBuffer.getChar(position), value);
+ break;
+ case "short":
+ position = positionToWriteInSingleBuffer(Short.BYTES);
+ _compoundDataBuffer.putShort(position, (short) value);
+ assertEquals(_compoundDataBuffer.getShort(position), value);
+ break;
+ case "byte":
+ position = positionToWriteInSingleBuffer(Byte.BYTES);
+ _compoundDataBuffer.putByte(position, (byte) value);
+ assertEquals(_compoundDataBuffer.getByte(position), value);
+ break;
+ case "bytes":
+ byte[] bytes = (byte[]) value;
+ position = positionToWriteInSingleBuffer(bytes.length);
+ _compoundDataBuffer.readFrom(position, bytes);
+ byte[] readBytes = new byte[bytes.length];
+ _compoundDataBuffer.copyTo(position, readBytes);
+ assertEquals(readBytes, bytes);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported data type " +
dataType);
+ }
+ }
+
+ @Test(dataProvider = "thingsToWrite")
+ public void writeBetweenBuffers(String dataType, Object value) {
+ int position;
+ switch (dataType) {
+ case "int":
+ position = positionToWriteTwoBuffer(Integer.BYTES);
+ _compoundDataBuffer.putInt(position, (int) value);
+ assertEquals(_compoundDataBuffer.getInt(position), value);
+ break;
+ case "long":
+ position = positionToWriteTwoBuffer(Long.BYTES);
+ _compoundDataBuffer.putLong(position, (long) value);
+ assertEquals(_compoundDataBuffer.getLong(position), value);
+ break;
+ case "float":
+ position = positionToWriteTwoBuffer(Float.BYTES);
+ _compoundDataBuffer.putFloat(position, (float) value);
+ assertEquals(_compoundDataBuffer.getFloat(position), value);
+ break;
+ case "double":
+ position = positionToWriteTwoBuffer(Double.BYTES);
+ _compoundDataBuffer.putDouble(position, (double) value);
+ assertEquals(_compoundDataBuffer.getDouble(position), value);
+ break;
+ case "char":
+ position = positionToWriteTwoBuffer(Character.BYTES);
+ _compoundDataBuffer.putChar(position, (char) value);
+ assertEquals(_compoundDataBuffer.getChar(position), value);
+ break;
+ case "short":
+ position = positionToWriteTwoBuffer(Short.BYTES);
+ _compoundDataBuffer.putShort(position, (short) value);
+ assertEquals(_compoundDataBuffer.getShort(position), value);
+ break;
+ case "byte":
+ position = positionToWriteTwoBuffer(Byte.BYTES);
+ _compoundDataBuffer.putByte(position, (byte) value);
+ assertEquals(_compoundDataBuffer.getByte(position), value);
+ break;
+ case "bytes":
+ byte[] bytes = (byte[]) value;
+ position = positionToWriteTwoBuffer(bytes.length);
+ _compoundDataBuffer.readFrom(position, bytes);
+ byte[] readBytes = new byte[bytes.length];
+ _compoundDataBuffer.copyTo(position, readBytes);
+ assertEquals(readBytes, bytes);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported data type " +
dataType);
+ }
+ }
+
+ @Test
+ void readWriteByteBuffer() {
+ Random r = new Random(42);
+ byte[] bytes = new byte[BUFFER_SIZE];
+
+ int position = positionToWriteTwoBuffer(bytes.length);
+
+ r.nextBytes(bytes);
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ _compoundDataBuffer.readFrom(position, buffer);
+ ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
+ _compoundDataBuffer.copyTo(position, readBuffer, 0, BUFFER_SIZE);
+ assertEquals(readBuffer.array(), bytes);
+ }
+
+ @Test
+ void readWriteFile()
+ throws IOException {
+ Random r = new Random(42);
+ byte[] bytes = new byte[BUFFER_SIZE];
+
+ int position = positionToWriteTwoBuffer(bytes.length);
+
+ r.nextBytes(bytes);
+ File file = Files.createTempFile("test", "data").toFile();
+ try {
+ try (RandomAccessFile randomAccessFile = new RandomAccessFile(file,
"rw")) {
+ randomAccessFile.setLength(BUFFER_SIZE);
+ try (BufferedOutputStream bos = new BufferedOutputStream(new
FileOutputStream(file))) {
+ bos.write(bytes);
+ }
+ }
+ _compoundDataBuffer.readFrom(position, file, 0, BUFFER_SIZE);
+ byte[] readBytes = new byte[BUFFER_SIZE];
+ _compoundDataBuffer.copyTo(position, readBytes);
+ assertEquals(readBytes, bytes);
+ } finally {
+ file.delete();
+ }
+ }
+
+ @Test
+ void testViewTotal() {
+ Random r = new Random(42);
+ byte[] bytes1 = new byte[BUFFER_SIZE];
+ r.nextBytes(bytes1);
+
+ byte[] bytes2 = new byte[BUFFER_SIZE];
+ r.nextBytes(bytes2);
+
+ byte[] bytes3 = new byte[BUFFER_SIZE];
+ r.nextBytes(bytes3);
+
+ _compoundDataBuffer.readFrom(0, bytes1);
+ _compoundDataBuffer.readFrom(BUFFER_SIZE, bytes2);
+ _compoundDataBuffer.readFrom(BUFFER_SIZE * 2L, bytes3);
+
+ DataBuffer view = _compoundDataBuffer.view(0, BUFFER_SIZE * 3L);
+ assertEquals(view.size(), BUFFER_SIZE * 3L);
+ assertEquals(view.order(), ByteOrder.BIG_ENDIAN);
+ assertEquals(view, _compoundDataBuffer);
+ }
+
+ @Test
+ void testViewSeveralBuffers() {
+ Random r = new Random(42);
+ byte[] bytes1 = new byte[BUFFER_SIZE];
+ r.nextBytes(bytes1);
+
+ byte[] bytes2 = new byte[BUFFER_SIZE];
+ r.nextBytes(bytes2);
+
+ byte[] bytes3 = new byte[BUFFER_SIZE];
+ r.nextBytes(bytes3);
+
+ byte[] totalBytes = new byte[BUFFER_SIZE * 2];
+
+ System.arraycopy(bytes1, 1, totalBytes, 0, BUFFER_SIZE - 1);
+ System.arraycopy(bytes2, 0, totalBytes, BUFFER_SIZE - 1, BUFFER_SIZE);
+ System.arraycopy(bytes3, 0, totalBytes, BUFFER_SIZE * 2 - 1, 1);
+
+ DataBuffer expected = PinotByteBuffer.wrap(totalBytes);
+
+ // prepare the compound buffer
+ _compoundDataBuffer.readFrom(0, bytes1);
+ _compoundDataBuffer.readFrom(BUFFER_SIZE, bytes2);
+ _compoundDataBuffer.readFrom(BUFFER_SIZE * 2L, bytes3);
+
+ // apply the view
+ DataBuffer view = _compoundDataBuffer.view(1, BUFFER_SIZE * 2L + 1);
+ assertEquals(view.size(), BUFFER_SIZE * 2L);
+ assertEquals(view.order(), ByteOrder.BIG_ENDIAN);
+
+ assertEquals(view, expected);
+ }
+
+ @Test
+ void testViewOneBuffer() {
+ Random r = new Random(42);
+ byte[] bytes1 = new byte[BUFFER_SIZE];
+ r.nextBytes(bytes1);
+
+ _compoundDataBuffer.readFrom(0, bytes1);
+
+ DataBuffer expected = PinotByteBuffer.wrap(Arrays.copyOfRange(bytes1, 1,
BUFFER_SIZE - 1));
+
+ DataBuffer view = _compoundDataBuffer.view(1, BUFFER_SIZE - 1);
+ assertEquals(view.size(), BUFFER_SIZE - 2);
+ assertEquals(view, expected);
+ }
+
+ @Test
+ void testCopyOrViewOneBuffer() {
+ Random r = new Random(42);
+ byte[] bytes1 = new byte[BUFFER_SIZE];
+ r.nextBytes(bytes1);
+
+ _compoundDataBuffer.readFrom(0, bytes1);
+
+ ByteBuffer expected = ByteBuffer.wrap(bytes1, 1, BUFFER_SIZE - 1);
+
+ ByteBuffer copyOrView = _compoundDataBuffer.copyOrView(1, BUFFER_SIZE - 1);
+ assertEquals(copyOrView, expected);
+ }
+}
diff --git
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/DataBufferPinotInputStreamTest.java
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/DataBufferPinotInputStreamTest.java
new file mode 100644
index 0000000000..89318e22c8
--- /dev/null
+++
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/DataBufferPinotInputStreamTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Random;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class DataBufferPinotInputStreamTest {
+
+ ByteBuffer _byteBuffer;
+ DataBuffer _dataBuffer;
+ DataBufferPinotInputStream _dataBufferPinotInputStream;
+ private static final int BUFFER_SIZE = 32;
+
+ @BeforeMethod
+ public void setUp() {
+ Random r = new Random(42);
+
+ byte[] buffer = new byte[BUFFER_SIZE];
+ r.nextBytes(buffer);
+
+ _byteBuffer = ByteBuffer.wrap(buffer).order(ByteOrder.BIG_ENDIAN);
+ _dataBuffer = PinotByteBuffer.wrap(_byteBuffer);
+ _dataBufferPinotInputStream = new DataBufferPinotInputStream(_dataBuffer);
+ }
+
+ @Test
+ void readByteBuffer() {
+ byte[] buffer = new byte[BUFFER_SIZE];
+ ByteBuffer wrap = ByteBuffer.wrap(buffer);
+ _dataBufferPinotInputStream.read(wrap);
+
+ assertEquals(wrap, _byteBuffer);
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), BUFFER_SIZE);
+ }
+
+ @Test
+ void readByteMovesCursor() {
+ int read = _dataBufferPinotInputStream.read();
+ assertEquals(read, _byteBuffer.get(0) & 0xFF);
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 1);
+ }
+
+ @Test
+ void seekMovesCursor() {
+ _dataBufferPinotInputStream.seek(1);
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 1);
+
+ int read = _dataBufferPinotInputStream.read();
+ assertEquals(read, _byteBuffer.get(1) & 0xFF);
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 2);
+ }
+
+ @Test
+ void readAtEndReturnsMinusOne() {
+ _dataBufferPinotInputStream.seek(BUFFER_SIZE);
+ assertEquals(_dataBufferPinotInputStream.read(), -1);
+ }
+
+ @Test
+ void testSkip() {
+ _dataBufferPinotInputStream.seek(1);
+ long skip = _dataBufferPinotInputStream.skip(2);
+ assertEquals(skip, 2);
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 3);
+ }
+
+ @Test
+ void testSkipEnd() {
+ _dataBufferPinotInputStream.seek(BUFFER_SIZE - 1);
+ long skip = _dataBufferPinotInputStream.skip(2);
+ assertEquals(skip, 1);
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), BUFFER_SIZE);
+ }
+
+ @Test
+ void testAvailable() {
+ assertEquals(_dataBufferPinotInputStream.available(), BUFFER_SIZE);
+ _dataBufferPinotInputStream.seek(1);
+ assertEquals(_dataBufferPinotInputStream.available(), BUFFER_SIZE - 1);
+ }
+
+ @Test
+ void testReadInt()
+ throws EOFException {
+ int read = _dataBufferPinotInputStream.readInt();
+
+ assertEquals(read, _byteBuffer.getInt(0));
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(),
Integer.BYTES);
+ }
+
+ @Test
+ void testReadLong()
+ throws EOFException {
+ long read = _dataBufferPinotInputStream.readLong();
+
+ assertEquals(read, _byteBuffer.getLong(0));
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), Long.BYTES);
+ }
+
+ @Test
+ void testReadShort()
+ throws EOFException {
+ short read = _dataBufferPinotInputStream.readShort();
+
+ assertEquals(read, _byteBuffer.getShort(0));
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), Short.BYTES);
+ }
+
+ @Test
+ void testReadBoolean()
+ throws EOFException {
+ boolean read = _dataBufferPinotInputStream.readBoolean();
+
+ assertEquals(read, _byteBuffer.get(0) != 0);
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 1);
+ }
+
+ @Test
+ void testReadByte()
+ throws EOFException {
+ int read = _dataBufferPinotInputStream.readByte();
+
+ assertEquals(read, _byteBuffer.get(0) & 0xFF);
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 1);
+ }
+
+ @Test
+ void testReadUnsignedByte()
+ throws EOFException {
+ int read = _dataBufferPinotInputStream.readUnsignedByte();
+
+ assertEquals(read, _byteBuffer.get(0) & 0xFF);
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 1);
+ }
+
+ @Test
+ void testReadUnsignedShort()
+ throws EOFException {
+ int read = _dataBufferPinotInputStream.readUnsignedShort();
+
+ assertEquals(read, _byteBuffer.getShort(0) & 0xFFFF);
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), Short.BYTES);
+ }
+
+ @Test
+ void testReadFloat()
+ throws EOFException {
+ float read = _dataBufferPinotInputStream.readFloat();
+
+ assertEquals(read, _byteBuffer.getFloat(0));
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), Float.BYTES);
+ }
+
+ @Test
+ void testReadDouble()
+ throws EOFException {
+ double read = _dataBufferPinotInputStream.readDouble();
+
+ assertEquals(read, _byteBuffer.getDouble(0));
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), Double.BYTES);
+ }
+
+ @Test
+ void testReadChar()
+ throws EOFException {
+ char read = _dataBufferPinotInputStream.readChar();
+
+ assertEquals(read, _byteBuffer.getChar(0));
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(),
Character.BYTES);
+ }
+
+ @Test
+ void testReadFully()
+ throws IOException {
+ byte[] buffer = new byte[BUFFER_SIZE];
+ _dataBufferPinotInputStream.readFully(buffer);
+
+ for (int i = 0; i < BUFFER_SIZE; i++) {
+ assertEquals(buffer[i], _byteBuffer.get(i));
+ }
+ assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), BUFFER_SIZE);
+ }
+}
diff --git
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PagedPinotOutputStreamTest.java
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PagedPinotOutputStreamTest.java
new file mode 100644
index 0000000000..f496511f5f
--- /dev/null
+++
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PagedPinotOutputStreamTest.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Random;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class PagedPinotOutputStreamTest {
+
+ int _pageSize;
+ PagedPinotOutputStream _pagedPinotOutputStream;
+
+ @BeforeMethod
+ public void setUp() {
+ _pagedPinotOutputStream = PagedPinotOutputStream.createHeap();
+ _pageSize = _pagedPinotOutputStream.getPageSize();
+ }
+
+ @Test
+ void writeByteInPage()
+ throws IOException {
+ byte b = 42;
+ _pagedPinotOutputStream.write(b);
+ assertEquals(_pagedPinotOutputStream.getCurrentOffset(), 1);
+
+ byte read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN,
false).getByte(0);
+ assertEquals(read, b);
+ }
+
+ @Test
+ void writeByteAcrossPage()
+ throws IOException {
+ byte b = 42;
+
+ _pagedPinotOutputStream.seek(_pageSize);
+ _pagedPinotOutputStream.write(b);
+
+ assertEquals(_pagedPinotOutputStream.getPages().length, 2);
+
+ byte read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN,
false).getByte(_pageSize);
+ assertEquals(read, b);
+ }
+
+ @Test
+ void writeIntInPage()
+ throws IOException {
+ int i = 42;
+ _pagedPinotOutputStream.writeInt(i);
+ assertEquals(_pagedPinotOutputStream.getCurrentOffset(), Integer.BYTES);
+
+ int read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN,
false).getInt(0);
+ assertEquals(read, i);
+ }
+
+ @Test
+ void writeIntAcrossPage()
+ throws IOException {
+ int i = 42;
+
+ _pagedPinotOutputStream.seek(_pageSize - Integer.BYTES + 1);
+ _pagedPinotOutputStream.writeInt(i);
+
+ assertEquals(_pagedPinotOutputStream.getPages().length, 2);
+
+ int read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN,
false).getInt(_pageSize - Integer.BYTES + 1);
+ assertEquals(read, i);
+ }
+
+ @Test
+ void writeLongInPage()
+ throws IOException {
+ long l = 42;
+ _pagedPinotOutputStream.writeLong(l);
+ assertEquals(_pagedPinotOutputStream.getCurrentOffset(), Long.BYTES);
+
+ long read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN,
false).getLong(0);
+ assertEquals(read, l);
+ }
+
+ @Test
+ void writeLongAcrossPage()
+ throws IOException {
+ long l = 42;
+
+ _pagedPinotOutputStream.seek(_pageSize - Long.BYTES + 1);
+ _pagedPinotOutputStream.writeLong(l);
+
+ assertEquals(_pagedPinotOutputStream.getPages().length, 2);
+
+ long read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN,
false).getLong(_pageSize - Long.BYTES + 1);
+ assertEquals(read, l);
+ }
+
+ @Test
+ void writeShortInPage()
+ throws IOException {
+ short s = 42;
+ _pagedPinotOutputStream.writeShort(s);
+ assertEquals(_pagedPinotOutputStream.getCurrentOffset(), Short.BYTES);
+
+ short read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN,
false).getShort(0);
+ assertEquals(read, s);
+ }
+
+ @Test
+ void writeShortAcrossPage()
+ throws IOException {
+ short s = 42;
+
+ _pagedPinotOutputStream.seek(_pageSize - Short.BYTES + 1);
+ _pagedPinotOutputStream.writeShort(s);
+
+ assertEquals(_pagedPinotOutputStream.getPages().length, 2);
+
+ short read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN,
false).getShort(_pageSize - Short.BYTES + 1);
+ assertEquals(read, s);
+ }
+
+ @Test
+ void seekIntoPageStart()
+ throws IOException {
+ int i = 42;
+ _pagedPinotOutputStream.seek(_pageSize);
+ assertEquals(_pagedPinotOutputStream.getCurrentOffset(), _pageSize);
+
+ _pagedPinotOutputStream.writeInt(i);
+
+ int read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN,
false).getInt(_pageSize);
+ assertEquals(read, i);
+ }
+
+ @Test
+ void seekIntoNegative() {
+ assertThrows(IllegalArgumentException.class, () ->
_pagedPinotOutputStream.seek(-1));
+ }
+
+ @Test
+ void seekIntoNotExistentPage()
+ throws IOException {
+ int i = 42;
+ long newPosition = _pageSize * 10L + 2;
+ _pagedPinotOutputStream.seek(newPosition);
+
+ _pagedPinotOutputStream.writeInt(i);
+
+ int read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN,
false).getInt(newPosition);
+ assertEquals(read, i);
+ }
+
+ @Test
+ void seekIntoStart()
+ throws IOException {
+ int i = 42;
+ _pagedPinotOutputStream.seek(123);
+ _pagedPinotOutputStream.seek(0);
+
+ _pagedPinotOutputStream.writeInt(i);
+
+ int read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN,
false).getInt(0);
+ assertEquals(read, i);
+ }
+
+ @Test
+ void writeLargeByteArray()
+ throws IOException {
+ Random r = new Random(42);
+ byte[] bytes = new byte[_pageSize + 1];
+ r.nextBytes(bytes);
+
+ _pagedPinotOutputStream.write(bytes);
+
+ byte[] read = new byte[_pageSize + 1];
+ _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, false).copyTo(0,
read);
+
+ assertEquals(read, bytes);
+ assertEquals(_pagedPinotOutputStream.getPages().length, 2);
+ }
+
+ @Test
+ void writeSmallByteArray()
+ throws IOException {
+ Random r = new Random(42);
+ byte[] bytes = new byte[_pageSize / 2];
+ r.nextBytes(bytes);
+
+ _pagedPinotOutputStream.write(bytes);
+
+ byte[] read = new byte[_pageSize / 2];
+ _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, false).copyTo(0,
read);
+
+ assertEquals(read, bytes);
+ assertEquals(_pagedPinotOutputStream.getPages().length, 1);
+ }
+
+ @Test
+ void writeLargeDataInput()
+ throws IOException {
+ Random r = new Random(42);
+ byte[] bytes = new byte[_pageSize + 1];
+ r.nextBytes(bytes);
+
+ _pagedPinotOutputStream.write(PinotByteBuffer.wrap(bytes));
+
+ byte[] read = new byte[_pageSize + 1];
+ _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, false).copyTo(0,
read);
+
+ assertEquals(read, bytes);
+ assertEquals(_pagedPinotOutputStream.getPages().length, 2);
+ }
+
+ @Test
+ void writeSmallDataInput()
+ throws IOException {
+ Random r = new Random(42);
+ byte[] bytes = new byte[_pageSize / 2];
+ r.nextBytes(bytes);
+
+ _pagedPinotOutputStream.write(PinotByteBuffer.wrap(bytes));
+
+ byte[] read = new byte[_pageSize / 2];
+ _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, false).copyTo(0,
read);
+
+ assertEquals(read, bytes);
+ assertEquals(_pagedPinotOutputStream.getPages().length, 1);
+ }
+
+ @Test
+ void testGetPagesEmpty() {
+ ByteBuffer[] pages = _pagedPinotOutputStream.getPages();
+ assertEquals(pages.length, 0);
+ }
+
+ @Test
+ void testGetPagesSingleIntWrite()
+ throws IOException {
+ _pagedPinotOutputStream.writeInt(42);
+
+ ByteBuffer[] pages = _pagedPinotOutputStream.getPages();
+ assertEquals(pages.length, 1);
+ assertEquals(pages[0].position(), 0);
+ assertEquals(pages[0].limit(), 4);
+
+ assertEquals(pages[0].getInt(0), 42);
+ }
+
+ @Test
+ void testGetPagesAfterSeekingInTheMiddleOfAPage() {
+ _pagedPinotOutputStream.seek(_pageSize * 2L + 1);
+
+ ByteBuffer[] pages = _pagedPinotOutputStream.getPages();
+ assertEquals(pages.length, 3);
+
+ assertEquals(pages[0].position(), 0);
+ assertEquals(pages[0].limit(), _pageSize);
+
+ assertEquals(pages[1].position(), 0);
+ assertEquals(pages[1].limit(), _pageSize);
+
+ assertEquals(pages[2].position(), 0);
+ assertEquals(pages[2].limit(), 1);
+ }
+
+ @Test
+ void testGetPagesAfterSeekingInTheStartOfAPage() {
+ _pagedPinotOutputStream.seek(_pageSize * 2L);
+
+ ByteBuffer[] pages = _pagedPinotOutputStream.getPages();
+ assertEquals(pages.length, 2);
+
+ assertEquals(pages[0].position(), 0);
+ assertEquals(pages[0].limit(), _pageSize);
+
+ assertEquals(pages[1].position(), 0);
+ assertEquals(pages[1].limit(), _pageSize);
+ }
+
+ @Test
+ void testGetPagesUsesWrittenInsteadOfOffset() {
+ _pagedPinotOutputStream.seek(_pageSize * 2L);
+ _pagedPinotOutputStream.seek(1);
+
+ ByteBuffer[] pages = _pagedPinotOutputStream.getPages();
+ assertEquals(pages.length, 2);
+
+ assertEquals(pages[0].position(), 0);
+ assertEquals(pages[0].limit(), _pageSize);
+
+ assertEquals(pages[1].position(), 0);
+ assertEquals(pages[1].limit(), _pageSize);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]