This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 66676d9bf8 New buffers (#13304)
66676d9bf8 is described below

commit 66676d9bf89326a1850f7872062bec0b3e5115ad
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Mon Jun 24 13:00:58 2024 +0200

    New buffers (#13304)
    
    Add some new IO and buffer classes:
    - DataBuffer, a new interface that contains most (if not all) methods of 
PinotDataBuffer, which now implements this interface
    - CompoundDataBuffer, a new class that wraps a list of DataBuffers and 
offers a continuous DataBuffer withouth copying them.
    - PinotOutputStream, an OutputStream whose cursor can be moved.
    - PinotInputStream, the same, but for InputStream.
    - PagedPinotOutputStream, a PinotOutputStream that stores data into a list 
of ByteBuffers. When the last ByteBuffer in the list is filled, a new one is 
created.
    - DataBufferPinotInputStream, an adaptor that takes a DataBuffer and shows 
it as a PinotInputStream.
---
 .../pinot/segment/spi/memory/BasePinotLBuffer.java |   2 +-
 .../segment/spi/memory/CompoundDataBuffer.java     | 674 +++++++++++++++++++++
 .../pinot/segment/spi/memory/DataBuffer.java       | 288 +++++++++
 .../spi/memory/DataBufferPinotInputStream.java     | 244 ++++++++
 .../spi/memory/NonNativePinotDataBuffer.java       |   2 +-
 .../segment/spi/memory/PagedPinotOutputStream.java | 406 +++++++++++++
 .../pinot/segment/spi/memory/PinotByteBuffer.java  |  14 +-
 .../pinot/segment/spi/memory/PinotDataBuffer.java  | 194 ++++--
 .../pinot/segment/spi/memory/PinotInputStream.java | 107 ++++
 .../segment/spi/memory/PinotOutputStream.java      | 142 +++++
 .../segment/spi/memory/CompoundDataBufferTest.java | 313 ++++++++++
 .../spi/memory/DataBufferPinotInputStreamTest.java | 208 +++++++
 .../spi/memory/PagedPinotOutputStreamTest.java     | 314 ++++++++++
 13 files changed, 2850 insertions(+), 58 deletions(-)

diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java
index 0718c99082..9ba50911d2 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java
@@ -62,7 +62,7 @@ public abstract class BasePinotLBuffer extends 
PinotDataBuffer {
   }
 
   @Override
-  public void copyTo(long offset, PinotDataBuffer buffer, long destOffset, 
long size) {
+  public void copyTo(long offset, DataBuffer buffer, long destOffset, long 
size) {
     if (buffer instanceof BasePinotLBuffer) {
       _buffer.copyTo(offset, ((BasePinotLBuffer) buffer)._buffer, destOffset, 
size);
     } else {
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CompoundDataBuffer.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CompoundDataBuffer.java
new file mode 100644
index 0000000000..69e3522128
--- /dev/null
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CompoundDataBuffer.java
@@ -0,0 +1,674 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+
+/**
+ * A {@link DataBuffer} that is composed of multiple {@link DataBuffer}s that 
define a single contiguous buffer.
+ * <p>
+ * While reads and writes can span multiple buffers, there may be a 
performance impact when doing so.
+ * Therefore it is recommended to try to wrap independent buffers.
+ * <p>
+ * Once this class is built, buffers cannot be added or removed.
+ */
+public class CompoundDataBuffer implements DataBuffer {
+
+  private final DataBuffer[] _buffers;
+  private final long[] _bufferOffsets;
+  private int _lastBufferIndex = 0;
+  private final ByteOrder _order;
+  private final long _size;
+  private final boolean _owner;
+
+  /**
+   * Creates a compound buffer from the given buffers.
+   *
+   * @param buffers The buffers that will be concatenated to form the compound 
buffer.
+   * @param order The byte order of the buffer. Buffers in the array that have 
a different byte order will be converted.
+   * @param owner Whether this buffer owns the underlying buffers. If true, 
the underlying buffers will be released when
+   *              this buffer is closed.
+   */
+  public CompoundDataBuffer(DataBuffer[] buffers, ByteOrder order, boolean 
owner) {
+    _owner = owner;
+    _buffers = buffers;
+    _bufferOffsets = new long[buffers.length];
+    _order = order;
+    long offset = 0;
+    for (int i = 0; i < buffers.length; i++) {
+      if (buffers[i].size() == 0) {
+        throw new IllegalArgumentException("Buffer at index " + i + " is 
empty");
+      }
+      if (buffers[i].order() != _order) {
+        buffers[i] = buffers[i].view(0, buffers[i].size(), _order);
+      }
+    }
+    for (int i = 0; i < buffers.length; i++) {
+      _bufferOffsets[i] = offset;
+      long size = buffers[i].size();
+      offset += size;
+    }
+    _size = offset;
+  }
+
+  public CompoundDataBuffer(ByteBuffer[] buffers, ByteOrder order, boolean 
owner) {
+    this(asDataBufferArray(buffers), order, owner);
+  }
+
+  /**
+   * Creates a compound buffer from the given buffers.
+   * @param buffers The buffers that will be concatenated to form the compound 
buffer.
+   * @param order The byte order of the buffer. Buffers in the list that have 
a different byte order will be converted.
+   * @param owner Whether this buffer owns the underlying buffers. If true, 
the underlying buffers will be released when
+   *              this buffer is closed.
+   */
+  public CompoundDataBuffer(List<DataBuffer> buffers, ByteOrder order, boolean 
owner) {
+    this(buffers.toArray(new DataBuffer[0]), order, owner);
+  }
+
+  private static DataBuffer[] asDataBufferArray(ByteBuffer[] buffers) {
+    DataBuffer[] result = new DataBuffer[buffers.length];
+    for (int i = 0; i < buffers.length; i++) {
+      result[i] = PinotByteBuffer.wrap(buffers[i]);
+    }
+    return result;
+  }
+
+  private int getBufferIndex(long offset) {
+    // this optimistically assumes that lookups are going to be in ascending 
order
+    // we don't care about concurrency here given that this is only used to 
speed up the lookup
+    int lastBufferIndex = _lastBufferIndex;
+    if (_bufferOffsets[lastBufferIndex] > offset) {
+      lastBufferIndex = 0;
+    }
+
+    for (int i = lastBufferIndex; i < _bufferOffsets.length; i++) {
+      if (offset < _bufferOffsets[i]) {
+        int result = i - 1;
+        _lastBufferIndex = result;
+        return result;
+      }
+    }
+    int result = _bufferOffsets.length - 1;
+    _lastBufferIndex = result;
+    return result;
+  }
+
+  private ByteBuffer copy(long offset, int length) {
+    if (offset + length > _size) {
+      throw new BufferUnderflowException();
+    }
+    byte[] result = new byte[length];
+
+    int bufferIndex = getBufferIndex(offset);
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+
+    DataBuffer buffer = _buffers[bufferIndex];
+    int toCopy = (int) Math.min(length, buffer.size() - inBufferIndex);
+    buffer.copyTo(inBufferIndex, result, 0, toCopy);
+
+    int remaining = length - toCopy;
+    while (remaining > 0) {
+      bufferIndex++;
+      buffer = _buffers[bufferIndex];
+      toCopy = (int) Math.min(remaining, buffer.size());
+      buffer.copyTo(0, result, length - remaining, toCopy);
+
+      remaining -= toCopy;
+    }
+    return ByteBuffer.wrap(result).order(_order);
+  }
+
+  @Override
+  public void readFrom(long offset, byte[] input, int srcOffset, int size) {
+    if (offset + size > _size) {
+      throw new BufferOverflowException();
+    }
+
+    int bufferIndex = getBufferIndex(offset);
+    int remaining = size;
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    while (remaining > 0) {
+      DataBuffer buffer = _buffers[bufferIndex];
+      int toRead = (int) Math.min(remaining, buffer.size() - inBufferIndex);
+      buffer.readFrom(inBufferIndex, input, srcOffset, toRead);
+
+      bufferIndex++;
+      remaining -= toRead;
+      srcOffset += toRead;
+      inBufferIndex = 0; // from now on we always write in the first position 
of the buffer
+    }
+  }
+
+  @Override
+  public void readFrom(long offset, ByteBuffer input) {
+    if (offset + input.remaining() > _size) {
+      throw new BufferOverflowException();
+    }
+
+    int startLimit = input.limit();
+    int bufferIndex = getBufferIndex(offset);
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+
+    while (input.hasRemaining()) {
+      DataBuffer buffer = _buffers[bufferIndex];
+      int toRead = (int) Math.min(input.remaining(), buffer.size() - 
inBufferIndex);
+
+      input.limit(input.position() + toRead);
+      buffer.readFrom(inBufferIndex, input);
+
+      input.position(input.limit());
+      input.limit(startLimit);
+      bufferIndex++;
+      inBufferIndex = 0; // from now on we always write in the first position 
of the buffer
+    }
+  }
+
+  @Override
+  public void readFrom(long offset, File file, long srcOffset, long size)
+      throws IOException {
+    if (offset + size > _size) {
+      throw new BufferOverflowException();
+    }
+
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    long toRead = Math.min(size, buffer.size() - inBufferIndex);
+    buffer.readFrom(inBufferIndex, file, srcOffset, toRead);
+
+    long fileOffset = srcOffset + toRead;
+    long remaining = size - toRead;
+    while (remaining > 0) {
+      bufferIndex++;
+      buffer = _buffers[bufferIndex];
+      toRead = Math.min(remaining, buffer.size());
+      buffer.readFrom(0, file, fileOffset, toRead);
+
+      remaining -= toRead;
+      fileOffset += toRead;
+    }
+  }
+
+  @Override
+  public void copyTo(long offset, byte[] buffer, int destOffset, int size) {
+    if (offset + size > _size) {
+      throw new BufferUnderflowException();
+    }
+
+    int bufferIndex = getBufferIndex(offset);
+    int remaining = size;
+
+    DataBuffer bufferToCopy = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    int toCopy = (int) Math.min(remaining, bufferToCopy.size() - 
inBufferIndex);
+    bufferToCopy.copyTo(inBufferIndex, buffer, destOffset, toCopy);
+
+    remaining -= toCopy;
+    destOffset += toCopy;
+
+    while (remaining > 0) {
+      bufferIndex++;
+      bufferToCopy = _buffers[bufferIndex];
+      toCopy = (int) Math.min(remaining, bufferToCopy.size());
+      bufferToCopy.copyTo(0, buffer, destOffset, toCopy);
+
+      remaining -= toCopy;
+      destOffset += toCopy;
+    }
+  }
+
+  @Override
+  public void copyTo(long offset, DataBuffer buffer, long destOffset, long 
size) {
+    if (offset + size > _size) {
+      throw new BufferUnderflowException();
+    }
+
+    int bufferIndex = getBufferIndex(offset);
+    long remaining = size;
+
+    DataBuffer bufferToCopy = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    long toCopy = Math.min(remaining, bufferToCopy.size() - inBufferIndex);
+    bufferToCopy.copyTo(inBufferIndex, buffer, destOffset, toCopy);
+
+    bufferIndex++;
+    remaining -= toCopy;
+    destOffset += toCopy;
+
+    while (remaining > 0) {
+      bufferToCopy = _buffers[bufferIndex];
+      toCopy = Math.min(remaining, bufferToCopy.size());
+      bufferToCopy.copyTo(0, buffer, destOffset, toCopy);
+
+      bufferIndex++;
+      remaining -= toCopy;
+      destOffset += toCopy;
+    }
+  }
+
+  @Override
+  public byte getByte(long offset) {
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    return buffer.getByte(inBufferIndex);
+  }
+
+  @Override
+  public void putByte(long offset, byte value) {
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    buffer.putByte(inBufferIndex, value);
+  }
+
+  @Override
+  public char getChar(long offset) {
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    if (inBufferIndex + Character.BYTES <= buffer.size()) {
+      // fast path
+      return buffer.getChar(inBufferIndex);
+    } else {
+      // slow path
+      ByteBuffer byteBuffer = copy(offset, Character.BYTES);
+      return byteBuffer.getChar();
+    }
+  }
+
+  @Override
+  public void putChar(long offset, char value) {
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    if (inBufferIndex + Character.BYTES <= buffer.size()) {
+      // fast path
+      buffer.putChar(inBufferIndex, value);
+    } else {
+      // slow path
+      ByteBuffer byteBuffer = ByteBuffer.allocate(Character.BYTES)
+          .order(_order)
+          .putChar(value);
+      byteBuffer.flip();
+      readFrom(offset, byteBuffer);
+    }
+  }
+
+  @Override
+  public short getShort(long offset) {
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    if (inBufferIndex + Short.BYTES <= buffer.size()) {
+      // fast path
+      return buffer.getShort(inBufferIndex);
+    } else {
+      // slow path
+      ByteBuffer byteBuffer = copy(offset, Short.BYTES);
+      return byteBuffer.getShort();
+    }
+  }
+
+  @Override
+  public void putShort(long offset, short value) {
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    if (inBufferIndex + Short.BYTES <= buffer.size()) {
+      // fast path
+      buffer.putShort(inBufferIndex, value);
+    } else {
+      // slow path
+      ByteBuffer byteBuffer = ByteBuffer.allocate(Short.BYTES)
+          .order(_order)
+          .putShort(value);
+      byteBuffer.flip();
+      readFrom(offset, byteBuffer);
+    }
+  }
+
+  @Override
+  public int getInt(long offset) {
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    if (inBufferIndex + Integer.BYTES <= buffer.size()) {
+      // fast path
+      return buffer.getInt(inBufferIndex);
+    } else {
+      // slow path
+      ByteBuffer byteBuffer = copy(offset, Integer.BYTES);
+      return byteBuffer.getInt();
+    }
+  }
+
+  @Override
+  public void putInt(long offset, int value) {
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    if (inBufferIndex + Integer.BYTES <= buffer.size()) {
+      // fast path
+      buffer.putInt(inBufferIndex, value);
+    } else {
+      // slow path
+      ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES)
+          .order(_order)
+          .putInt(value);
+      byteBuffer.flip();
+      readFrom(offset, byteBuffer);
+    }
+  }
+
+  @Override
+  public long getLong(long offset) {
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    if (inBufferIndex + Long.BYTES <= buffer.size()) {
+      // fast path
+      return buffer.getLong(inBufferIndex);
+    } else {
+      // slow path
+      ByteBuffer byteBuffer = copy(offset, Long.BYTES);
+      return byteBuffer.getLong();
+    }
+  }
+
+  @Override
+  public void putLong(long offset, long value) {
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    if (inBufferIndex + Long.BYTES <= buffer.size()) {
+      // fast path
+      buffer.putLong(inBufferIndex, value);
+    } else {
+      // slow path
+      ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
+          .order(_order)
+          .putLong(value);
+      byteBuffer.flip();
+      readFrom(offset, byteBuffer);
+    }
+  }
+
+  @Override
+  public float getFloat(long offset) {
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    if (inBufferIndex + Float.BYTES <= buffer.size()) {
+      // fast path
+      return buffer.getFloat(inBufferIndex);
+    } else {
+      // slow path
+      ByteBuffer byteBuffer = copy(offset, Float.BYTES);
+      return byteBuffer.getFloat();
+    }
+  }
+
+  @Override
+  public void putFloat(long offset, float value) {
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    if (inBufferIndex + Float.BYTES <= buffer.size()) {
+      // fast path
+      buffer.putFloat(inBufferIndex, value);
+    } else {
+      // slow path
+      ByteBuffer byteBuffer = ByteBuffer.allocate(Float.BYTES)
+          .order(_order)
+          .putFloat(value);
+      byteBuffer.flip();
+      readFrom(offset, byteBuffer);
+    }
+  }
+
+  @Override
+  public double getDouble(long offset) {
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    if (inBufferIndex + Double.BYTES <= buffer.size()) {
+      // fast path
+      return buffer.getDouble(inBufferIndex);
+    } else {
+      // slow path
+      ByteBuffer byteBuffer = copy(offset, Double.BYTES);
+      return byteBuffer.getDouble();
+    }
+  }
+
+  @Override
+  public void putDouble(long offset, double value) {
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    if (inBufferIndex + Double.BYTES <= buffer.size()) {
+      // fast path
+      buffer.putDouble(inBufferIndex, value);
+    } else {
+      // slow path
+      ByteBuffer byteBuffer = ByteBuffer.allocate(Double.BYTES)
+          .order(_order)
+          .putDouble(value);
+      byteBuffer.flip();
+      readFrom(offset, byteBuffer);
+    }
+  }
+
+  @Override
+  public long size() {
+    return _size;
+  }
+
+  @Override
+  public ByteOrder order() {
+    return _order;
+  }
+
+  @Override
+  public DataBuffer view(long start, long end, ByteOrder byteOrder) {
+    if (start < 0 || end > _size || start > end) {
+      throw new IllegalArgumentException("Invalid start/end: " + start + "/" + 
end);
+    }
+    if (start == 0 && end == _size && byteOrder == _order) {
+      return new CompoundDataBuffer(_buffers, _order, false);
+    }
+    int startBufferIndex = getBufferIndex(start);
+    int endBufferIndex = getBufferIndex(end - 1);
+
+    if (startBufferIndex == endBufferIndex) {
+      long bufferOffset = _bufferOffsets[startBufferIndex];
+      DataBuffer buffer = _buffers[startBufferIndex];
+      return buffer.view(start - bufferOffset, end - bufferOffset, byteOrder);
+    } else {
+      DataBuffer firstBuffer = _buffers[startBufferIndex]
+          .view(start - _bufferOffsets[startBufferIndex], 
_buffers[startBufferIndex].size(), byteOrder);
+      DataBuffer lastBuffer = _buffers[endBufferIndex]
+          .view(0, end - _bufferOffsets[endBufferIndex], byteOrder);
+
+      DataBuffer[] buffers = new DataBuffer[endBufferIndex - startBufferIndex 
+ 1];
+      buffers[0] = firstBuffer;
+      if (buffers.length > 2) {
+        System.arraycopy(_buffers, startBufferIndex + 1, buffers, 1, 
buffers.length - 2);
+      }
+      buffers[buffers.length - 1] = lastBuffer;
+
+      return new CompoundDataBuffer(buffers, byteOrder, false);
+    }
+  }
+
+  @Override
+  public ImmutableRoaringBitmap viewAsRoaringBitmap(long offset, int length) {
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    if (length < buffer.size()) {
+      return buffer.viewAsRoaringBitmap(offset, length);
+    } else {
+      ByteBuffer copy = copy(offset, length);
+      return new ImmutableRoaringBitmap(copy);
+    }
+  }
+
+  @Override
+  public ByteBuffer copyOrView(long offset, int size, ByteOrder byteOrder) {
+    if (offset + size > _size) {
+      throw new BufferUnderflowException();
+    }
+    int bufferIndex = getBufferIndex(offset);
+    DataBuffer buffer = _buffers[bufferIndex];
+    long inBufferIndex = offset - _bufferOffsets[bufferIndex];
+    if (inBufferIndex + size <= buffer.size()) {
+      return buffer.copyOrView(inBufferIndex, size, byteOrder);
+    } else {
+      return copy(offset, size).order(byteOrder);
+    }
+  }
+
+  @Override
+  public void flush() {
+    RuntimeException firstException = null;
+
+    for (DataBuffer buffer : _buffers) {
+      try {
+        buffer.flush();
+      } catch (RuntimeException ex) {
+        if (firstException == null) {
+          firstException = ex;
+        }
+      }
+    }
+    if (firstException != null) {
+      throw firstException;
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    if (!_owner) {
+      return;
+    }
+    IOException firstException = null;
+
+    for (DataBuffer buffer : _buffers) {
+      try {
+        buffer.close();
+      } catch (IOException ex) {
+        if (firstException == null) {
+          firstException = ex;
+        }
+      }
+    }
+    if (firstException != null) {
+      throw firstException;
+    }
+  }
+
+  public DataBuffer[] getBuffers() {
+    return _buffers;
+  }
+
+  @Override
+  public void appendAsByteBuffers(List<ByteBuffer> appendTo) {
+    for (DataBuffer buffer : _buffers) {
+      buffer.appendAsByteBuffers(appendTo);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof DataBuffer)) {
+      return false;
+    }
+    DataBuffer buffer = (DataBuffer) o;
+    return DataBuffer.sameContent(this, buffer);
+  }
+
+  @Override
+  public int hashCode() {
+    return DataBuffer.commonHashCode(this);
+  }
+
+  public static class Builder {
+    private final ByteOrder _order;
+    private final boolean _owner;
+    private final ArrayList<DataBuffer> _buffers = new ArrayList<>();
+
+    public Builder() {
+      this(ByteOrder.BIG_ENDIAN, false);
+    }
+
+    public Builder(ByteOrder order) {
+      this(order, false);
+    }
+
+    public Builder(boolean owner) {
+      this(ByteOrder.BIG_ENDIAN, owner);
+    }
+
+    public Builder(ByteOrder order, boolean owner) {
+      _order = order;
+      _owner = owner;
+    }
+
+    public Builder addBuffer(DataBuffer buffer) {
+      if (buffer instanceof CompoundDataBuffer) {
+        CompoundDataBuffer compoundBuffer = (CompoundDataBuffer) buffer;
+        for (DataBuffer childBuffer : compoundBuffer.getBuffers()) {
+          addBuffer(childBuffer);
+        }
+        return this;
+      }
+      if (buffer.size() != 0) {
+        _buffers.add(buffer);
+      }
+      return this;
+    }
+
+    public Builder addPagedOutputStream(PagedPinotOutputStream stream) {
+      ByteBuffer[] pages = stream.getPages();
+      for (ByteBuffer page : pages) {
+        addBuffer(PinotByteBuffer.wrap(page));
+      }
+      return this;
+    }
+
+    public CompoundDataBuffer build() {
+      return new CompoundDataBuffer(_buffers, _order, _owner);
+    }
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/DataBuffer.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/DataBuffer.java
new file mode 100644
index 0000000000..ec0d0c3261
--- /dev/null
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/DataBuffer.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import java.util.Objects;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+
+public interface DataBuffer extends Closeable {
+
+  default byte getByte(int offset) {
+    return getByte((long) offset);
+  }
+
+  byte getByte(long offset);
+
+  default void putByte(int offset, byte value) {
+    putByte((long) offset, value);
+  }
+
+  void putByte(long offset, byte value);
+
+  default char getChar(int offset) {
+    return getChar((long) offset);
+  }
+
+  char getChar(long offset);
+
+  default void putChar(int offset, char value) {
+    putChar((long) offset, value);
+  }
+
+  void putChar(long offset, char value);
+
+  default short getShort(int offset) {
+    return getShort((long) offset);
+  }
+
+  short getShort(long offset);
+
+  default void putShort(int offset, short value) {
+    putShort((long) offset, value);
+  }
+
+  void putShort(long offset, short value);
+
+  default int getInt(int offset) {
+    return getInt((long) offset);
+  }
+
+  int getInt(long offset);
+
+  default void putInt(int offset, int value) {
+    putInt((long) offset, value);
+  }
+
+  void putInt(long offset, int value);
+
+  default long getLong(int offset) {
+    return getLong((long) offset);
+  }
+
+  long getLong(long offset);
+
+  default void putLong(int offset, long value) {
+    putLong((long) offset, value);
+  }
+
+  void putLong(long offset, long value);
+
+  default float getFloat(int offset) {
+    return getFloat((long) offset);
+  }
+
+  float getFloat(long offset);
+
+  default void putFloat(int offset, float value) {
+    putFloat((long) offset, value);
+  }
+
+  void putFloat(long offset, float value);
+
+  default double getDouble(int offset) {
+    return getDouble((long) offset);
+  }
+
+  double getDouble(long offset);
+
+  default void putDouble(int offset, double value) {
+    putDouble((long) offset, value);
+  }
+
+  void putDouble(long offset, double value);
+
+  /**
+   * Given an array of bytes, copies the content of this object into the array 
of bytes.
+   * The first byte to be copied is the one that could be read with {@code 
this.getByte(offset)}
+   */
+  void copyTo(long offset, byte[] buffer, int destOffset, int size);
+
+  /**
+   * Given an array of bytes, copies the content of this object into the array 
of bytes.
+   * The first byte to be copied is the one that could be read with {@code 
this.getByte(offset)}
+   */
+  default void copyTo(long offset, byte[] buffer) {
+    copyTo(offset, buffer, 0, buffer.length);
+  }
+
+  /**
+   * Note: It is the responsibility of the caller to make sure arguments are 
checked before the methods are called.
+   * While some rudimentary checks are performed on the input, the checks are 
best effort and when performance is an
+   * overriding priority, as when methods of this class are optimized by the 
runtime compiler, some or all checks
+   * (if any) may be elided. Hence, the caller must not rely on the checks and 
corresponding exceptions!
+   */
+  void copyTo(long offset, DataBuffer buffer, long destOffset, long size);
+
+  default void copyTo(long offset, ByteBuffer buffer, int destOffset, int 
size) {
+    PinotByteBuffer wrap = PinotByteBuffer.wrap(buffer);
+    copyTo(offset, wrap, destOffset, size);
+  }
+
+  /**
+   * Given an array of bytes, writes the content in the specified position.
+   */
+  void readFrom(long offset, byte[] buffer, int srcOffset, int size);
+
+  default void readFrom(long offset, byte[] buffer) {
+    readFrom(offset, buffer, 0, buffer.length);
+  }
+
+  void readFrom(long offset, ByteBuffer buffer);
+
+  void readFrom(long offset, File file, long srcOffset, long size)
+      throws IOException;
+
+  long size();
+
+  ByteOrder order();
+
+  /**
+   * Creates a view of the range [start, end) of this buffer with the given 
byte order. Calling {@link #flush()} or
+   * {@link #close()} has no effect on view.
+   */
+  DataBuffer view(long start, long end, ByteOrder byteOrder);
+
+  /**
+   * Creates a view of the range [start, end) of this buffer with the current 
byte order. Calling {@link #flush()} or
+   * {@link #close()} has no effect on view.
+   */
+  default DataBuffer view(long start, long end) {
+    return view(start, end, order());
+  }
+
+  void flush();
+
+  default PinotInputStream openInputStream() {
+    return new DataBufferPinotInputStream(this);
+  }
+
+  default PinotInputStream openInputStream(long offset) {
+    return openInputStream(offset, size() - offset);
+  }
+
+  default PinotInputStream openInputStream(long offset, long length) {
+    return new DataBufferPinotInputStream(this, offset, offset + length);
+  }
+
+  /**
+   * Reads the range [offset, offset + length) as a RoaringBitmap.
+   * <p>
+   * Implementations should do their best to do not allocate memory and 
instead return a view of the underlying data.
+   */
+  ImmutableRoaringBitmap viewAsRoaringBitmap(long offset, int length);
+
+  /**
+   * Returns a ByteBuffer whose content is the same as the range [offset, 
offset + size) of this buffer.
+   * <p>
+   * Implementations should do their best to do not allocate memory and 
instead return a view of the underlying data.
+   * Callers cannot assume they have the ownership of the returned ByteBuffer, 
and therefore should not call
+   * {@link CleanerUtil#cleanQuietly(ByteBuffer)} using the returned object.
+   */
+  ByteBuffer copyOrView(long offset, int size, ByteOrder byteOrder);
+
+  void appendAsByteBuffers(List<ByteBuffer> appendTo);
+
+  /**
+   *
+   * Returns a ByteBuffer whose content is the same as the range [offset, 
offset + size) of this buffer.
+   * <p>
+   * Implementations should do their best to do not allocate memory and 
instead return a view of the underlying data.
+   * Callers cannot assume they have the ownership of the returned ByteBuffer, 
and therefore should not call
+   * {@link CleanerUtil#cleanQuietly(ByteBuffer)} using the returned object.
+   * <p>
+   * The returned ByteBuffer will have the same byte order as this buffer.
+   */
+  default ByteBuffer copyOrView(long offset, int size) {
+    return copyOrView(offset, size, order());
+  }
+
+  static boolean sameContent(DataBuffer buffer1, DataBuffer buffer2) {
+    long size = buffer1.size();
+    if (size != buffer2.size()) {
+      return false;
+    }
+    DataBuffer nativeBuffer1 = buffer1.view(0, size, ByteOrder.nativeOrder());
+    DataBuffer nativeBuffer2 = buffer2.view(0, size, ByteOrder.nativeOrder());
+
+    long maxLong = size & ~7L;
+    for (long i = 0; i < maxLong; i += 8) {
+      if (nativeBuffer1.getLong(i) != nativeBuffer2.getLong(i)) {
+        return false;
+      }
+    }
+    for (long i = maxLong; i < size; i++) {
+      if (buffer1.getByte(i) != buffer2.getByte(i)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  static int commonHashCode(DataBuffer dataBuffer) {
+    long firstLong;
+    long lastLong;
+    long size = dataBuffer.size();
+    int intSize;
+    if (size > Integer.MAX_VALUE) {
+      intSize = Integer.MAX_VALUE;
+    } else {
+      intSize = (int) size;
+    }
+    switch (intSize) {
+      case 0:
+        firstLong = 0;
+        lastLong = 0;
+        break;
+      case 1:
+        firstLong = dataBuffer.getByte(0);
+        lastLong = firstLong;
+        break;
+      case 2:
+        firstLong = dataBuffer.getShort(0);
+        lastLong = firstLong;
+        break;
+      case 3:
+        firstLong = dataBuffer.getShort(0);
+        lastLong = dataBuffer.getShort(1);
+        break;
+      case 4:
+        firstLong = dataBuffer.getInt(0);
+        lastLong = firstLong;
+        break;
+      case 5:
+      case 6:
+      case 7:
+        firstLong = dataBuffer.getInt(0);
+        lastLong = dataBuffer.getInt(intSize - 4);
+        break;
+      default:
+        firstLong = dataBuffer.getLong(0);
+        lastLong = dataBuffer.getLong(size - 8);
+        break;
+    }
+    return Objects.hash(size, firstLong, lastLong);
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/DataBufferPinotInputStream.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/DataBufferPinotInputStream.java
new file mode 100644
index 0000000000..e470b4863b
--- /dev/null
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/DataBufferPinotInputStream.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+
+/**
+ * An adaptor that allows a {@link DataBuffer} to be read as a {@link 
PinotInputStream}.
+ */
+public class DataBufferPinotInputStream extends PinotInputStream {
+  private final DataBuffer _dataBuffer;
+  private long _currentOffset;
+
+  public DataBufferPinotInputStream(DataBuffer dataBuffer) {
+    this(dataBuffer, 0, dataBuffer.size());
+  }
+
+  public DataBufferPinotInputStream(DataBuffer dataBuffer, long startOffset, 
long endOffset) {
+    _dataBuffer = dataBuffer.view(startOffset, endOffset, 
ByteOrder.BIG_ENDIAN);
+    _currentOffset = 0;
+  }
+
+  @Override
+  public long getCurrentOffset() {
+    return _currentOffset;
+  }
+
+  @Override
+  public void seek(long newPos) {
+    if (newPos < 0 || newPos > _dataBuffer.size()) {
+      throw new IllegalArgumentException("Invalid new position: " + newPos);
+    }
+    _currentOffset = newPos;
+  }
+
+  @Override
+  public int read(ByteBuffer buf) {
+    int remaining = available();
+    if (remaining == 0) {
+      return -1;
+    }
+    PinotByteBuffer wrap = PinotByteBuffer.wrap(buf);
+    int toRead = Math.min(remaining, buf.remaining());
+    if (toRead > 0) {
+      _dataBuffer.copyTo(_currentOffset, wrap, 0, toRead);
+      _currentOffset += toRead;
+    }
+
+    return toRead;
+  }
+
+  @Override
+  public int read() {
+    if (_currentOffset >= _dataBuffer.size()) {
+      return -1;
+    } else {
+      return _dataBuffer.getByte(_currentOffset++) & 0xFF;
+    }
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) {
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", 
b.length=" + b.length);
+    }
+    int available = available();
+    if (available == 0) {
+      return -1;
+    }
+    int result = Math.min(available, len);
+    _dataBuffer.copyTo(_currentOffset, b, off, result);
+    _currentOffset += result;
+    return result;
+  }
+
+  @Override
+  public long skip(long n) {
+    long increase = Math.min(n, availableLong());
+    _currentOffset += increase;
+    return increase;
+  }
+
+  public long availableLong() {
+    return _dataBuffer.size() - _currentOffset;
+  }
+
+  @Override
+  public int available() {
+    long available = _dataBuffer.size() - _currentOffset;
+    if (available > Integer.MAX_VALUE) {
+      return Integer.MAX_VALUE;
+    } else {
+      return (int) available;
+    }
+  }
+
+  @Override
+  public void readFully(byte[] b, int off, int len)
+      throws EOFException {
+    if (len < 0) {
+      throw new IndexOutOfBoundsException("len is negative: " + len);
+    }
+    if (off < 0 || off + len > b.length) {
+      throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", 
b.length=" + b.length);
+    }
+    // the javadoc of DataInput.readFully(byte[], int, int) says that the 
method will block until the requested
+    // number of bytes has been read, end of file is detected, or an exception 
is thrown.
+    // So being formal, we should modify the buffer even if we know we are 
going to reach EOF.
+    boolean eof = availableLong() < len;
+    _dataBuffer.copyTo(_currentOffset, b, off, len);
+    _currentOffset += len;
+    if (eof) {
+      throw new EOFException();
+    }
+  }
+
+  @Override
+  public boolean readBoolean()
+      throws EOFException {
+    if (_currentOffset >= _dataBuffer.size()) {
+      throw new EOFException();
+    }
+    return _dataBuffer.getByte(_currentOffset++) != 0;
+  }
+
+  @Override
+  public byte readByte()
+      throws EOFException {
+    if (availableLong() < 1) {
+      throw new EOFException();
+    }
+    return _dataBuffer.getByte(_currentOffset++);
+  }
+
+  @Override
+  public int readUnsignedByte()
+      throws EOFException {
+    if (availableLong() < 1) {
+      throw new EOFException();
+    }
+    return _dataBuffer.getByte(_currentOffset++) & 0xFF;
+  }
+
+  @Override
+  public short readShort()
+      throws EOFException {
+    if (availableLong() < 2) {
+      throw new EOFException();
+    }
+    short result = _dataBuffer.getShort(_currentOffset);
+    _currentOffset += 2;
+    return result;
+  }
+
+  @Override
+  public int readUnsignedShort()
+      throws EOFException {
+    return readShort() & 0xFFFF;
+  }
+
+  @Override
+  public char readChar()
+      throws EOFException {
+    return (char) readUnsignedShort();
+  }
+
+  @Override
+  public int readInt()
+      throws EOFException {
+    if (availableLong() < 4) {
+      throw new EOFException();
+    }
+    int result = _dataBuffer.getInt(_currentOffset);
+    _currentOffset += 4;
+    return result;
+  }
+
+  @Override
+  public long readLong()
+      throws EOFException {
+    if (availableLong() < 8) {
+      throw new EOFException();
+    }
+    long result = _dataBuffer.getLong(_currentOffset);
+    _currentOffset += 8;
+    return result;
+  }
+
+  @Override
+  public float readFloat()
+      throws EOFException {
+    if (availableLong() < 4) {
+      throw new EOFException();
+    }
+    float result = _dataBuffer.getFloat(_currentOffset);
+    _currentOffset += 4;
+    return result;
+  }
+
+  @Override
+  public double readDouble()
+      throws EOFException {
+    if (availableLong() < 8) {
+      throw new EOFException();
+    }
+    double result = _dataBuffer.getDouble(_currentOffset);
+    _currentOffset += 8;
+    return result;
+  }
+
+  @Deprecated
+  @Override
+  public String readLine()
+      throws IOException {
+    return new DataInputStream(this).readLine();
+  }
+
+  @Override
+  public String readUTF()
+      throws IOException {
+    return new DataInputStream(this).readUTF();
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/NonNativePinotDataBuffer.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/NonNativePinotDataBuffer.java
index 755345666f..3d29cb549e 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/NonNativePinotDataBuffer.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/NonNativePinotDataBuffer.java
@@ -205,7 +205,7 @@ public class NonNativePinotDataBuffer extends 
PinotDataBuffer {
   }
 
   @Override
-  public void copyTo(long offset, PinotDataBuffer buffer, long destOffset, 
long size) {
+  public void copyTo(long offset, DataBuffer buffer, long destOffset, long 
size) {
     _nativeBuffer.copyTo(offset, buffer, destOffset, size);
   }
 
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PagedPinotOutputStream.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PagedPinotOutputStream.java
new file mode 100644
index 0000000000..4153761e99
--- /dev/null
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PagedPinotOutputStream.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import org.apache.commons.lang3.ArchUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of {@link PinotOutputStream} that writes to a sequence of 
pages.
+ * <p>
+ * When a page is full, a new page is allocated and writing continues there.
+ * The page size is determined by the {@link PageAllocator} used to create the 
stream.
+ * <p>
+ * This class is specially useful when writing data whose final size is not 
known in advance or when it is needed to
+ * {@link #seek(long) seek} to a specific position in the stream.
+ * Writes that cross page boundaries are handled transparently, although a 
performance penalty will be paid.
+ * <p>
+ * Some {@link PageAllocator} implementations may support releasing pages, 
which can be useful to reduce memory usage.
+ * The smaller the page size, the more pages will be allocated, but the 
allocation is quite faster. This is specially
+ * important in the case of heap allocation, in which case allocations smaller 
than TLAB will be faster.
+ * <p>
+ * Data written in this stream can be retrieved as a list of {@link 
ByteBuffer} pages using {@link #getPages()}, which
+ * can be directly read using {@link CompoundDataBuffer} or send to the 
network as using
+ * {@link com.google.protobuf.UnsafeByteOperations#unsafeWrap(byte[]) GRPC}.
+ */
+public class PagedPinotOutputStream extends PinotOutputStream {
+  private final PageAllocator _allocator;
+  private final int _pageSize;
+  private final ArrayList<ByteBuffer> _pages;
+  private ByteBuffer _currentPage;
+  private long _currentPageStartOffset;
+  private int _offsetInPage;
+  private long _written = 0;
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PagedPinotOutputStream.class);
+
+  public PagedPinotOutputStream(PageAllocator allocator) {
+    _pageSize = allocator.pageSize();
+    _allocator = allocator;
+    _pages = new ArrayList<>(8);
+    _currentPage = _allocator.allocate().order(ByteOrder.BIG_ENDIAN);
+    _currentPageStartOffset = 0;
+    _pages.add(_currentPage);
+  }
+
+  public static PagedPinotOutputStream createHeap() {
+    return new PagedPinotOutputStream(HeapPageAllocator.createSmall());
+  }
+
+  private void nextPage() {
+    moveCurrentOffset(remainingInPage());
+  }
+
+  private int remainingInPage() {
+    return _pageSize - _offsetInPage;
+  }
+
+  /**
+   * Returns a read only view of the pages written so far.
+   * <p>
+   * All pages but the last one will have its position set to 0 and the limit 
to their capacity.
+   * The latest page will have its position set to 0 and its limit set to the 
last byte written.
+   *
+   * TODO: Add one option that let caller choose start and end offset.
+   */
+  public ByteBuffer[] getPages() {
+    int numPages = _pages.size();
+
+    boolean lastPageIsEmpty = _written == (numPages - 1) * (long) _pageSize;
+    if (lastPageIsEmpty) {
+      numPages--;
+    }
+    if (numPages == 0) {
+      return new ByteBuffer[0];
+    }
+    ByteBuffer[] result = new ByteBuffer[numPages];
+
+    for (int i = 0; i < numPages; i++) {
+      ByteBuffer byteBuffer = _pages.get(i);
+      ByteBuffer page = byteBuffer.asReadOnlyBuffer();
+      page.clear();
+      result[i] = page;
+    }
+
+    if (!lastPageIsEmpty) {
+      long startOffset = getCurrentOffset();
+      seek(_written);
+      result[numPages - 1].limit(_offsetInPage);
+      seek(startOffset);
+    }
+
+    return result;
+  }
+
+  @Override
+  public long getCurrentOffset() {
+    return _currentPageStartOffset + _offsetInPage;
+  }
+
+  @Override
+  public void seek(long newPos) {
+    if (newPos < 0) {
+      throw new IllegalArgumentException("New position cannot be negative");
+    }
+    if (newPos == 0) {
+      _currentPage = _pages.get(0);
+      _offsetInPage = 0;
+    } else {
+      int pageIdx = (int) (newPos / _pageSize);
+      if (pageIdx >= _pages.size()) {
+        _pages.ensureCapacity(pageIdx + 1);
+        while (_pages.size() <= pageIdx) {
+          _pages.add(_allocator.allocate().order(ByteOrder.BIG_ENDIAN));
+        }
+      }
+      int offsetInPage = (int) (newPos % _pageSize);
+      _currentPage = _pages.get(pageIdx);
+      _currentPageStartOffset = pageIdx * (long) _pageSize;
+      _offsetInPage = offsetInPage;
+    }
+    _written = Math.max(_written, newPos);
+  }
+
+  @Override
+  public void write(int b)
+      throws IOException {
+    if (remainingInPage() == 0) {
+      nextPage();
+    }
+    _currentPage.put(_offsetInPage++, (byte) b);
+    _written = Math.max(_written, _offsetInPage + _currentPageStartOffset);
+  }
+
+  @Override
+  public void writeInt(int v)
+      throws IOException {
+    if (remainingInPage() >= Integer.BYTES) {
+      _currentPage.putInt(_offsetInPage, v);
+      _offsetInPage += Integer.BYTES;
+      _written = Math.max(_written, _offsetInPage + _currentPageStartOffset);
+    } else {
+      super.writeInt(v);
+    }
+  }
+
+  @Override
+  public void writeLong(long v)
+      throws IOException {
+    if (remainingInPage() >= Long.BYTES) {
+      _currentPage.putLong(_offsetInPage, v);
+      _offsetInPage += Long.BYTES;
+      _written = Math.max(_written, _offsetInPage + _currentPageStartOffset);
+    } else {
+      super.writeLong(v);
+    }
+  }
+
+  @Override
+  public void writeShort(int v)
+      throws IOException {
+    if (remainingInPage() >= Short.BYTES) {
+      _currentPage.putShort(_offsetInPage, (short) v);
+      _offsetInPage += Short.BYTES;
+      _written = Math.max(_written, _offsetInPage + _currentPageStartOffset);
+    } else {
+      super.writeShort(v);
+    }
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len)
+      throws IOException {
+    if (remainingInPage() >= len) {
+      _currentPage.position(_offsetInPage);
+      _currentPage.put(b, off, len);
+      _offsetInPage += len;
+      _currentPage.position(0);
+    } else {
+      int written = 0;
+      while (written < len) {
+        int remainingInPage = remainingInPage();
+        if (remainingInPage == 0) {
+          nextPage();
+          continue;
+        }
+        int toWrite = Math.min(len - written, remainingInPage);
+        _currentPage.position(_offsetInPage);
+        _currentPage.put(b, off + written, toWrite);
+        _currentPage.position(0);
+        written += toWrite;
+        _offsetInPage += toWrite;
+      }
+    }
+    _written = Math.max(_written, _offsetInPage + _currentPageStartOffset);
+  }
+
+  @Override
+  public void write(DataBuffer input, long offset, long length)
+      throws IOException {
+    if (remainingInPage() >= length) {
+      int intLength = (int) length;
+      input.copyTo(offset, _currentPage, _offsetInPage, intLength);
+      _offsetInPage += intLength;
+    } else {
+      long written = 0;
+      while (written < length) {
+        int remainingInPage = remainingInPage();
+        if (remainingInPage == 0) {
+          nextPage();
+          continue;
+        }
+        int toWrite = (int) Math.min(length - written, remainingInPage);
+        input.copyTo(offset + written, _currentPage, _offsetInPage, toWrite);
+        written += toWrite;
+        _offsetInPage += toWrite;
+      }
+    }
+    _written = Math.max(_written, _offsetInPage + _currentPageStartOffset);
+  }
+
+  /**
+   * Returns a view of the data written so far as a {@link DataBuffer}.
+   * <p>
+   * The returned DataBuffer will contain all the data being written. This is 
specially important when
+   * {@link #getCurrentOffset()} has been moved back from the latest written 
position.
+   *
+   * TODO: Add one option that let caller choose start and end offset.
+   */
+  public DataBuffer asBuffer(ByteOrder order, boolean owner) {
+    if (_written == 0) {
+      return PinotDataBuffer.empty();
+    }
+
+    // TODO: We can remove this check
+    ByteBuffer[] pages = getPages();
+    for (int i = 0; i < pages.length; i++) {
+      ByteBuffer page = pages[i];
+      if (page.remaining() != _pageSize && (i != pages.length - 1 || 
!page.hasRemaining())) {
+        throw new IllegalArgumentException("Unexpected remaining bytes in page 
" + i + ": " + page.remaining());
+      }
+    }
+
+    return new CompoundDataBuffer(pages, order, owner);
+  }
+
+  public int getPageSize() {
+    return _pageSize;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    IOException ex = null;
+    for (ByteBuffer page : _pages) {
+      try {
+        _allocator.release(page);
+      } catch (IOException e) {
+        if (ex == null) {
+          ex = e;
+        } else {
+          ex.addSuppressed(e);
+        }
+      }
+    }
+    if (ex != null) {
+      throw ex;
+    }
+  }
+
+  public static abstract class PageAllocator {
+    public static final int MIN_RECOMMENDED_PAGE_SIZE;
+    public static final int MAX_RECOMMENDED_PAGE_SIZE;
+
+    static {
+      int minRecommendedPageSize = -1;
+      int maxRecommendedPageSize = -1;
+      try {
+        switch (ArchUtils.getProcessor().getType()) {
+          case AARCH_64:
+            // ARM processors support 4KB and 1MB pages
+            minRecommendedPageSize = 16 * 1024;
+            maxRecommendedPageSize = 1024 * 1024;
+            break;
+          case X86:
+          default:
+            // X86 processors support 4KB and 4MB pages
+            minRecommendedPageSize = 4 * 1024;
+            maxRecommendedPageSize = 4 * 1024 * 1024;
+            break;
+        }
+      } catch (Throwable t) {
+        LOGGER.warn("Could not determine processor architecture. Falling back 
to default values", t);
+        // Fallback to 4KB and 4MBs
+        minRecommendedPageSize = 4 * 1024;
+        maxRecommendedPageSize = 4 * 1024 * 1024;
+      }
+      MIN_RECOMMENDED_PAGE_SIZE = minRecommendedPageSize;
+      MAX_RECOMMENDED_PAGE_SIZE = maxRecommendedPageSize;
+    }
+
+    public abstract int pageSize();
+
+    public abstract ByteBuffer allocate();
+
+    public abstract void release(ByteBuffer buffer)
+        throws IOException;
+  }
+
+  public static class HeapPageAllocator extends PageAllocator {
+
+    private final int _pageSize;
+
+    public static HeapPageAllocator createSmall() {
+      return new HeapPageAllocator(MIN_RECOMMENDED_PAGE_SIZE);
+    }
+
+    public static HeapPageAllocator createLarge() {
+      return new HeapPageAllocator(MAX_RECOMMENDED_PAGE_SIZE);
+    }
+
+    public HeapPageAllocator(int pageSize) {
+      Preconditions.checkArgument(pageSize > 0, "Page size must be positive");
+      _pageSize = pageSize;
+    }
+
+    @Override
+    public int pageSize() {
+      return _pageSize;
+    }
+
+    @Override
+    public ByteBuffer allocate() {
+      return ByteBuffer.allocate(_pageSize);
+    }
+
+    @Override
+    public void release(ByteBuffer buffer) {
+      // Do nothing
+    }
+  }
+
+  public static class DirectPageAllocator extends PageAllocator {
+
+    private final int _pageSize;
+    private final boolean _release;
+
+    public DirectPageAllocator(int pageSize) {
+      this(pageSize, false);
+    }
+
+    public DirectPageAllocator(int pageSize, boolean release) {
+      Preconditions.checkArgument(pageSize > 0, "Page size must be positive");
+      _pageSize = pageSize;
+      _release = release;
+    }
+
+    public static DirectPageAllocator createSmall(boolean release) {
+      return new DirectPageAllocator(MIN_RECOMMENDED_PAGE_SIZE, release);
+    }
+
+    public static DirectPageAllocator createLarge(boolean release) {
+      return new DirectPageAllocator(MAX_RECOMMENDED_PAGE_SIZE, release);
+    }
+
+    @Override
+    public int pageSize() {
+      return _pageSize;
+    }
+
+    @Override
+    public ByteBuffer allocate() {
+      return ByteBuffer.allocateDirect(_pageSize);
+    }
+
+    @Override
+    public void release(ByteBuffer buffer)
+        throws IOException {
+      if (_release && CleanerUtil.UNMAP_SUPPORTED) {
+        CleanerUtil.getCleaner().freeBuffer(buffer);
+      }
+    }
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotByteBuffer.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotByteBuffer.java
index 1cb357ea44..c3fc91f28b 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotByteBuffer.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotByteBuffer.java
@@ -31,6 +31,8 @@ import javax.annotation.concurrent.ThreadSafe;
 
 @ThreadSafe
 public class PinotByteBuffer extends PinotDataBuffer {
+  public static final PinotDataBuffer EMPTY = 
PinotByteBuffer.wrap(ByteBuffer.allocate(0));
+
   private final ByteBuffer _buffer;
   private final boolean _flushable;
 
@@ -60,6 +62,14 @@ public class PinotByteBuffer extends PinotDataBuffer {
     }
   }
 
+  public static PinotByteBuffer wrap(byte[] buffer) {
+    return new PinotByteBuffer(ByteBuffer.wrap(buffer), false, false);
+  }
+
+  public static PinotByteBuffer wrap(ByteBuffer buffer) {
+    return new PinotByteBuffer(buffer, false, false);
+  }
+
   private PinotByteBuffer(ByteBuffer buffer, boolean closeable, boolean 
flushable) {
     super(closeable);
     _buffer = buffer;
@@ -237,7 +247,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
   }
 
   @Override
-  public void copyTo(long offset, PinotDataBuffer buffer, long destOffset, 
long size) {
+  public void copyTo(long offset, DataBuffer buffer, long destOffset, long 
size) {
     assert offset <= Integer.MAX_VALUE;
     assert size <= Integer.MAX_VALUE;
     int start = (int) offset;
@@ -282,7 +292,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
       ByteBuffer duplicate = _buffer.duplicate();
       int start = (int) offset;
       int end = start + (int) size;
-      ((Buffer) duplicate).position(start).limit(end);
+      duplicate.position(start).limit(end);
       channel.read(duplicate, srcOffset);
     }
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java
index 425f7e6b84..2ed9d22e92 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.segment.spi.memory;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.io.Closeable;
+import com.google.common.collect.MapMaker;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -29,7 +29,6 @@ import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.WeakHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
@@ -37,6 +36,7 @@ import 
org.apache.pinot.segment.spi.memory.unsafe.UnsafePinotBufferFactory;
 import org.apache.pinot.segment.spi.utils.JavaVersion;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.plugin.PluginManager;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
  * </ul>
  */
 @ThreadSafe
-public abstract class PinotDataBuffer implements Closeable {
+public abstract class PinotDataBuffer implements DataBuffer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotDataBuffer.class);
 
   public static final ByteOrder NATIVE_ORDER = ByteOrder.nativeOrder();
@@ -71,8 +71,8 @@ public abstract class PinotDataBuffer implements Closeable {
   private static final boolean DEFAULT_PRIORITIZE_BYTE_BUFFER;
 
   static {
-     String skipBbEnvValue = System.getenv(SKIP_BYTEBUFFER_ENV);
-     DEFAULT_PRIORITIZE_BYTE_BUFFER = !Boolean.parseBoolean(skipBbEnvValue);
+    String skipBbEnvValue = System.getenv(SKIP_BYTEBUFFER_ENV);
+    DEFAULT_PRIORITIZE_BYTE_BUFFER = !Boolean.parseBoolean(skipBbEnvValue);
   }
 
   private static class BufferContext {
@@ -110,7 +110,8 @@ public abstract class PinotDataBuffer implements Closeable {
   private static final AtomicLong MMAP_BUFFER_COUNT = new AtomicLong();
   private static final AtomicLong MMAP_BUFFER_USAGE = new AtomicLong();
   private static final AtomicLong ALLOCATION_FAILURE_COUNT = new AtomicLong();
-  private static final Map<PinotDataBuffer, BufferContext> BUFFER_CONTEXT_MAP 
= new WeakHashMap<>();
+  // we need to use MapMaker instead of WeakHashMap because we want to use 
identity comparison for the keys
+  private static final Map<PinotDataBuffer, BufferContext> BUFFER_CONTEXT_MAP 
= new MapMaker().weakKeys().makeMap();
 
   /**
    * Configuration key used to change the offheap buffer factory used by Pinot.
@@ -233,9 +234,7 @@ public abstract class PinotDataBuffer implements Closeable {
     }
     DIRECT_BUFFER_COUNT.getAndIncrement();
     DIRECT_BUFFER_USAGE.getAndAdd(size);
-    synchronized (BUFFER_CONTEXT_MAP) {
-      BUFFER_CONTEXT_MAP.put(buffer, new 
BufferContext(BufferContext.Type.DIRECT, size, null, description));
-    }
+    BUFFER_CONTEXT_MAP.put(buffer, new 
BufferContext(BufferContext.Type.DIRECT, size, null, description));
     return buffer;
   }
 
@@ -257,10 +256,8 @@ public abstract class PinotDataBuffer implements Closeable 
{
     }
     DIRECT_BUFFER_COUNT.getAndIncrement();
     DIRECT_BUFFER_USAGE.getAndAdd(size);
-    synchronized (BUFFER_CONTEXT_MAP) {
-      BUFFER_CONTEXT_MAP.put(buffer,
-          new BufferContext(BufferContext.Type.DIRECT, size, 
file.getAbsolutePath().intern(), description));
-    }
+    BUFFER_CONTEXT_MAP.put(buffer,
+        new BufferContext(BufferContext.Type.DIRECT, size, 
file.getAbsolutePath().intern(), description));
     return buffer;
   }
 
@@ -292,10 +289,8 @@ public abstract class PinotDataBuffer implements Closeable 
{
     }
     MMAP_BUFFER_COUNT.getAndIncrement();
     MMAP_BUFFER_USAGE.getAndAdd(size);
-    synchronized (BUFFER_CONTEXT_MAP) {
-      BUFFER_CONTEXT_MAP
-          .put(buffer, new BufferContext(BufferContext.Type.MMAP, size, 
file.getAbsolutePath().intern(), description));
-    }
+    BUFFER_CONTEXT_MAP.put(buffer,
+        new BufferContext(BufferContext.Type.MMAP, size, 
file.getAbsolutePath().intern(), description));
     return buffer;
   }
 
@@ -338,19 +333,20 @@ public abstract class PinotDataBuffer implements 
Closeable {
   }
 
   public static List<String> getBufferInfo() {
-    synchronized (BUFFER_CONTEXT_MAP) {
-      List<String> bufferInfo = new ArrayList<>(BUFFER_CONTEXT_MAP.size());
-      for (BufferContext bufferContext : BUFFER_CONTEXT_MAP.values()) {
-        bufferInfo.add(bufferContext.toString());
-      }
-      return bufferInfo;
+    List<String> bufferInfo = new ArrayList<>(BUFFER_CONTEXT_MAP.size());
+    for (BufferContext bufferContext : BUFFER_CONTEXT_MAP.values()) {
+      bufferInfo.add(bufferContext.toString());
     }
+    return bufferInfo;
   }
 
   private static String getBufferStats() {
-    return String
-        .format("Direct buffer count: %s, size: %s; Mmap buffer count: %s, 
size: %s", DIRECT_BUFFER_COUNT.get(),
-            DIRECT_BUFFER_USAGE.get(), MMAP_BUFFER_COUNT.get(), 
MMAP_BUFFER_USAGE.get());
+    return String.format("Direct buffer count: %s, size: %s; Mmap buffer 
count: %s, size: %s",
+        DIRECT_BUFFER_COUNT.get(), DIRECT_BUFFER_USAGE.get(), 
MMAP_BUFFER_COUNT.get(), MMAP_BUFFER_USAGE.get());
+  }
+
+  public static PinotDataBuffer empty() {
+    return PinotByteBuffer.EMPTY;
   }
 
   private volatile boolean _closeable;
@@ -363,12 +359,8 @@ public abstract class PinotDataBuffer implements Closeable 
{
   public synchronized void close()
       throws IOException {
     if (_closeable) {
-      flush();
-      release();
       BufferContext bufferContext;
-      synchronized (BUFFER_CONTEXT_MAP) {
-        bufferContext = BUFFER_CONTEXT_MAP.remove(this);
-      }
+      bufferContext = BUFFER_CONTEXT_MAP.remove(this);
       if (bufferContext != null) {
         if (bufferContext._type == BufferContext.Type.DIRECT) {
           DIRECT_BUFFER_COUNT.getAndDecrement();
@@ -378,98 +370,129 @@ public abstract class PinotDataBuffer implements 
Closeable {
           MMAP_BUFFER_USAGE.getAndAdd(-bufferContext._size);
         }
       }
+      flush();
+      release();
       _closeable = false;
     }
   }
 
+  @Override
   public byte getByte(int offset) {
     return getByte((long) offset);
   }
 
+  @Override
   public abstract byte getByte(long offset);
 
+  @Override
   public void putByte(int offset, byte value) {
     putByte((long) offset, value);
   }
 
+  @Override
   public abstract void putByte(long offset, byte value);
 
+  @Override
   public char getChar(int offset) {
     return getChar((long) offset);
   }
 
+  @Override
   public abstract char getChar(long offset);
 
+  @Override
   public void putChar(int offset, char value) {
     putChar((long) offset, value);
   }
 
+  @Override
   public abstract void putChar(long offset, char value);
 
+  @Override
   public short getShort(int offset) {
     return getShort((long) offset);
   }
 
+  @Override
   public abstract short getShort(long offset);
 
+  @Override
   public void putShort(int offset, short value) {
     putShort((long) offset, value);
   }
 
+  @Override
   public abstract void putShort(long offset, short value);
 
+  @Override
   public int getInt(int offset) {
     return getInt((long) offset);
   }
 
+  @Override
   public abstract int getInt(long offset);
 
+  @Override
   public void putInt(int offset, int value) {
     putInt((long) offset, value);
   }
 
+  @Override
   public abstract void putInt(long offset, int value);
 
+  @Override
   public long getLong(int offset) {
     return getLong((long) offset);
   }
 
+  @Override
   public abstract long getLong(long offset);
 
+  @Override
   public void putLong(int offset, long value) {
     putLong((long) offset, value);
   }
 
+  @Override
   public abstract void putLong(long offset, long value);
 
+  @Override
   public float getFloat(int offset) {
     return getFloat((long) offset);
   }
 
+  @Override
   public abstract float getFloat(long offset);
 
+  @Override
   public void putFloat(int offset, float value) {
     putFloat((long) offset, value);
   }
 
+  @Override
   public abstract void putFloat(long offset, float value);
 
+  @Override
   public double getDouble(int offset) {
     return getDouble((long) offset);
   }
 
+  @Override
   public abstract double getDouble(long offset);
 
+  @Override
   public void putDouble(int offset, double value) {
     putDouble((long) offset, value);
   }
 
+  @Override
   public abstract void putDouble(long offset, double value);
 
   /**
    * Given an array of bytes, copies the content of this object into the array 
of bytes.
    * The first byte to be copied is the one that could be read with {@code 
this.getByte(offset)}
    */
+  @Override
   public void copyTo(long offset, byte[] buffer, int destOffset, int size) {
     if (size <= BULK_BYTES_PROCESSING_THRESHOLD) {
       int end = destOffset + size;
@@ -485,6 +508,7 @@ public abstract class PinotDataBuffer implements Closeable {
    * Given an array of bytes, copies the content of this object into the array 
of bytes.
    * The first byte to be copied is the one that could be read with {@code 
this.getByte(offset)}
    */
+  @Override
   public void copyTo(long offset, byte[] buffer) {
     copyTo(offset, buffer, 0, buffer.length);
   }
@@ -495,31 +519,51 @@ public abstract class PinotDataBuffer implements 
Closeable {
    * overriding priority, as when methods of this class are optimized by the 
runtime compiler, some or all checks
    * (if any) may be elided. Hence, the caller must not rely on the checks and 
corresponding exceptions!
    */
-  public void copyTo(long offset, PinotDataBuffer buffer, long destOffset, 
long size) {
-    int pageSize = Integer.MAX_VALUE;
-    long alreadyCopied = 0;
+  @Override
+  public void copyTo(long offset, DataBuffer buffer, long destOffset, long 
size) {
+    if (buffer instanceof PinotDataBuffer) {
+      int pageSize = Integer.MAX_VALUE;
+      long alreadyCopied = 0;
 
-    while (size - alreadyCopied > 0L) {
-      int step;
-      long remaining = size - alreadyCopied;
+      while (size - alreadyCopied > 0L) {
+        int step;
+        long remaining = size - alreadyCopied;
 
-      if (remaining > pageSize) {
-        step = pageSize;
-      } else {
-        step = (int) remaining;
-      }
-      ByteBuffer destBb = buffer.toDirectByteBuffer(destOffset + 
alreadyCopied, step);
-      ByteBuffer myView = toDirectByteBuffer(offset + alreadyCopied, step);
+        if (remaining > pageSize) {
+          step = pageSize;
+        } else {
+          step = (int) remaining;
+        }
+        ByteBuffer destBb = ((PinotDataBuffer) 
buffer).toDirectByteBuffer(destOffset + alreadyCopied, step);
+        ByteBuffer myView = toDirectByteBuffer(offset + alreadyCopied, step);
 
-      destBb.put(myView);
+        destBb.put(myView);
 
-      alreadyCopied += step;
+        alreadyCopied += step;
+      }
+    } else {
+      byte[] temp = new byte[BULK_BYTES_PROCESSING_THRESHOLD];
+      long alreadyCopied = 0;
+      while (size - alreadyCopied > 0L) {
+        int step;
+        long remaining = size - alreadyCopied;
+
+        if (remaining > BULK_BYTES_PROCESSING_THRESHOLD) {
+          step = BULK_BYTES_PROCESSING_THRESHOLD;
+        } else {
+          step = (int) remaining;
+        }
+        copyTo(offset + alreadyCopied, temp, 0, step);
+        buffer.readFrom(destOffset + alreadyCopied, temp, 0, step);
+        alreadyCopied += step;
+      }
     }
   }
 
   /**
    * Given an array of bytes, writes the content in the specified position.
    */
+  @Override
   public void readFrom(long offset, byte[] buffer, int srcOffset, int size) {
     if (offset + size > size()) {
       throw new IndexOutOfBoundsException("Buffer overflow: offset = " + 
offset + ", size = " + size
@@ -536,19 +580,20 @@ public abstract class PinotDataBuffer implements 
Closeable {
     }
   }
 
+  @Override
   public void readFrom(long offset, byte[] buffer) {
     readFrom(offset, buffer, 0, buffer.length);
   }
 
+  @Override
   public void readFrom(long offset, ByteBuffer buffer) {
     toDirectByteBuffer(offset, buffer.remaining()).put(buffer);
   }
 
+  @Override
   public void readFrom(long offset, File file, long srcOffset, long size)
       throws IOException {
-    try (
-        RandomAccessFile raf = new RandomAccessFile(file, "r");
-        FileChannel fileChannel = raf.getChannel()) {
+    try (RandomAccessFile raf = new RandomAccessFile(file, "r"); FileChannel 
fileChannel = raf.getChannel()) {
       int step = Integer.MAX_VALUE / 2;
       while (size > Integer.MAX_VALUE) {
         ByteBuffer bb = toDirectByteBuffer(offset, step);
@@ -562,24 +607,34 @@ public abstract class PinotDataBuffer implements 
Closeable {
     }
   }
 
+  @Override
   public abstract long size();
 
+  @Override
   public abstract ByteOrder order();
 
   /**
    * Creates a view of the range [start, end) of this buffer with the given 
byte order. Calling {@link #flush()} or
    * {@link #close()} has no effect on view.
    */
+  @Override
   public abstract PinotDataBuffer view(long start, long end, ByteOrder 
byteOrder);
 
   /**
    * Creates a view of the range [start, end) of this buffer with the current 
byte order. Calling {@link #flush()} or
    * {@link #close()} has no effect on view.
    */
+  @Override
   public PinotDataBuffer view(long start, long end) {
     return view(start, end, order());
   }
 
+  @Override
+  public ImmutableRoaringBitmap viewAsRoaringBitmap(long offset, int length) {
+    ByteBuffer bb = toDirectByteBuffer(offset, length, 
ByteOrder.LITTLE_ENDIAN);
+    return new ImmutableRoaringBitmap(bb);
+  }
+
   /**
    * Returns an ByteBuffer with the same content of this buffer.
    *
@@ -616,9 +671,6 @@ public abstract class PinotDataBuffer implements Closeable {
    *   <li>A write made by either the receiver or the returned ByteBuffer will 
be seen by the other.</li>
    * </ol>
    *
-   * Depending on the implementation, this may be a view (and therefore 
changes on any buffer will be seen by the other)
-   * or a copy (in which case the cost will be higher, but each copy will have 
their own lifecycle).
-   *
    */
   // TODO: Most calls to this method are just used to then read the content of 
the buffer.
   //  This is unnecessary an generates 2-5 unnecessary objects. We should 
benchmark whether there is some advantage on
@@ -627,6 +679,23 @@ public abstract class PinotDataBuffer implements Closeable 
{
     return toDirectByteBuffer(offset, size, order());
   }
 
+  @Override
+  public ByteBuffer copyOrView(long offset, int size, ByteOrder byteOrder) {
+    return toDirectByteBuffer(offset, size, byteOrder);
+  }
+
+  @Override
+  public void appendAsByteBuffers(List<ByteBuffer> appendTo) {
+    long size = size();
+    long offset = 0;
+    while (size - offset > 0) {
+      int byteBufferSize = (int) Math.min(size - offset, Integer.MAX_VALUE);
+      appendTo.add(copyOrView(offset, byteBufferSize));
+      offset += size;
+    }
+  }
+
+  @Override
   public abstract void flush();
 
   public abstract void release()
@@ -644,8 +713,25 @@ public abstract class PinotDataBuffer implements Closeable 
{
       throw new IllegalArgumentException("Size " + size + " cannot be 
negative");
     }
     if (offset + size > capacity) {
-      throw new IllegalArgumentException("Size (" + size + ") + offset (" + 
offset + ") exceeds the capacity of "
-          + capacity);
+      throw new IllegalArgumentException(
+          "Size (" + size + ") + offset (" + offset + ") exceeds the capacity 
of " + capacity);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof DataBuffer)) {
+      return false;
     }
+    DataBuffer buffer = (DataBuffer) o;
+    return DataBuffer.sameContent(this, buffer);
+  }
+
+  @Override
+  public int hashCode() {
+    return DataBuffer.commonHashCode(this);
   }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotInputStream.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotInputStream.java
new file mode 100644
index 0000000000..a7b8044b33
--- /dev/null
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotInputStream.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import org.apache.commons.lang3.StringUtils;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+/**
+ * A {@link DataInput} that also supports to move the cursor with {@link 
#seek(long)} and some other features
+ * used in Apache Pinot.
+ * <p>
+ * This class is based on Parquet's SeekableInputStream.
+ */
+public abstract class PinotInputStream extends InputStream implements 
DataInput {
+
+  /**
+   * Return the current position in the InputStream.
+   *
+   * @return current position in bytes from the start of the stream
+   */
+  public abstract long getCurrentOffset();
+
+  /**
+   * Seek to a new position in the InputStream.
+   *
+   * @param newPos the new position to seek to
+   * @throws IllegalArgumentException If the new position is negative or 
greater than the length of the stream
+   */
+  public abstract void seek(long newPos);
+
+  /**
+   * Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}.
+   * <p>
+   * This method will copy available bytes into the buffer, reading at most
+   * {@code buf.remaining()} bytes. The number of bytes actually copied is
+   * returned by the method, or -1 is returned to signal that the end of the
+   * underlying stream has been reached.
+   *
+   * @param buf a byte buffer to fill with data from the stream
+   * @return the number of bytes read or -1 if the stream ended. It may be 0 
if there are no bytes available in the
+   *         stream
+   * @throws IOException If the underlying stream throws IOException
+   */
+  public abstract int read(ByteBuffer buf) throws IOException;
+
+  public String readInt4UTF()
+      throws IOException {
+    int length = readInt();
+    if (length == 0) {
+      return StringUtils.EMPTY;
+    } else {
+      byte[] bytes = new byte[length];
+      readFully(bytes);
+      return new String(bytes, UTF_8);
+    }
+  }
+
+  public abstract long availableLong();
+
+  @Override
+  public int skipBytes(int n) {
+    if (n <= 0) {
+      return 0;
+    }
+    int step = Math.min(available(), n);
+    seek(getCurrentOffset() + step);
+    return step;
+  }
+
+  @Override
+  public int available() {
+    long available = availableLong();
+    if (available > Integer.MAX_VALUE) {
+      return Integer.MAX_VALUE;
+    } else {
+      return (int) available;
+    }
+  }
+
+  @Override
+  public void readFully(byte[] b)
+      throws IOException {
+    readFully(b, 0, b.length);
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotOutputStream.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotOutputStream.java
new file mode 100644
index 0000000000..db8a24313c
--- /dev/null
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotOutputStream.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import com.google.common.primitives.Longs;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+
+public abstract class PinotOutputStream extends OutputStream implements 
DataOutput {
+
+  /**
+   * Return the current position in the OutputStream.
+   *
+   * @return current position in bytes from the start of the stream
+   */
+  public abstract long getCurrentOffset();
+
+  /**
+   * Seek to a new position in the OutputStream.
+   *
+   * @param newPos the new position to seek to
+   * @throws IllegalArgumentException If the new position is negative
+   */
+  public abstract void seek(long newPos);
+
+  /**
+   * Moves the current offset, applying the given change.
+   * @param change the change to apply to the current offset
+   * @throws IllegalArgumentException if the new position is negative
+   */
+  public void moveCurrentOffset(long change) {
+    long newOffset = getCurrentOffset() + change;
+    seek(newOffset);
+  }
+
+  @Override
+  public void writeBoolean(boolean v) throws IOException {
+    write(v ? 1 : 0);
+  }
+
+  @Override
+  public void writeByte(int v) throws IOException {
+    write(v);
+  }
+
+  @Override
+  public void writeChar(int v) throws IOException {
+    writeShort(v);
+  }
+
+  @Override
+  public void writeChars(String s) throws IOException {
+    for (int i = 0; i < s.length(); i++) {
+      writeChar(s.charAt(i));
+    }
+  }
+
+  public void writeDouble(double v) throws IOException {
+    writeLong(Double.doubleToLongBits(v));
+  }
+
+  public void writeFloat(float v) throws IOException {
+    writeInt(Float.floatToIntBits(v));
+  }
+
+  public void writeInt(int v) throws IOException {
+    write(0xFF & (v >> 24));
+    write(0xFF & (v >> 16));
+    write(0xFF & (v >> 8));
+    write(0xFF & v);
+  }
+
+  public void writeLong(long v) throws IOException {
+    byte[] bytes = Longs.toByteArray(v);
+    write(bytes, 0, bytes.length);
+  }
+
+  public void writeShort(int v) throws IOException {
+    write(0xFF & (v >> 8));
+    write(0xFF & v);
+  }
+
+  /**
+   * This method is commonly used in Pinot to write a string with a 4-byte 
length prefix.
+   * <p>
+   * <b>Note</b>: This method is incompatible with {@link 
DataOutput#writeUTF(String)}. It is recommended to use
+   * this method instead of the one in {@link DataOutput} to write strings.
+   */
+  public void writeInt4String(String v) throws IOException {
+    writeInt(v.length());
+    write(v.getBytes(StandardCharsets.UTF_8));
+  }
+
+  @Override
+  public void writeBytes(String s)
+      throws IOException {
+    new DataOutputStream(this).writeBytes(s);
+  }
+
+  @Override
+  public void writeUTF(String s)
+      throws IOException {
+    new DataOutputStream(this).writeUTF(s);
+  }
+
+  public void write(DataBuffer input)
+      throws IOException {
+    write(input, 0, input.size());
+  }
+
+  public void write(DataBuffer input, long offset, long length)
+      throws IOException {
+    byte[] bytes = new byte[4096];
+    long currentOffset = offset;
+    while (currentOffset < offset + length) {
+      int bytesToRead = (int) Math.min(length - (currentOffset - offset), 
bytes.length);
+      input.copyTo(currentOffset, bytes, 0, bytesToRead);
+      write(bytes, 0, bytesToRead);
+      currentOffset += bytesToRead;
+    }
+  }
+}
diff --git 
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/CompoundDataBufferTest.java
 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/CompoundDataBufferTest.java
new file mode 100644
index 0000000000..23a37d6fec
--- /dev/null
+++ 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/CompoundDataBufferTest.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Random;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class CompoundDataBufferTest {
+
+  private CompoundDataBuffer _compoundDataBuffer;
+  private static final int BUFFER_SIZE = 32;
+
+  @BeforeMethod
+  public void setUp() {
+    _compoundDataBuffer = new CompoundDataBuffer(new DataBuffer[]{
+        PinotByteBuffer.wrap(new byte[BUFFER_SIZE]),
+        PinotByteBuffer.wrap(new byte[BUFFER_SIZE]),
+        PinotByteBuffer.wrap(new byte[BUFFER_SIZE])},
+        ByteOrder.BIG_ENDIAN,
+        true
+    );
+  }
+
+  @DataProvider
+  Object[][] thingsToWrite() {
+    Random r = new Random(42);
+    byte[] bytes = new byte[BUFFER_SIZE];
+    r.nextBytes(bytes);
+    return new Object[][] {
+        {"int", r.nextInt()},
+        {"float", r.nextFloat()},
+        {"long", r.nextLong()},
+        {"double", r.nextDouble()},
+        {"bytes", bytes},
+        {"char", 'a'},
+        {"short", (short) r.nextInt()},
+        {"byte", (byte) r.nextInt()}
+    };
+  }
+
+  private int positionToWriteInSingleBuffer(int writeSize) {
+    if (writeSize > BUFFER_SIZE) {
+      throw new IllegalArgumentException("Write size " + writeSize + " is 
greater than buffer size " + BUFFER_SIZE);
+    }
+    return 0;
+  }
+
+  private int positionToWriteTwoBuffer(int writeSize) {
+    if (writeSize > 2 * BUFFER_SIZE) {
+      throw new IllegalArgumentException("Write size " + writeSize + " is 
greater than " + (2 * BUFFER_SIZE));
+    }
+    return BUFFER_SIZE - writeSize + 1;
+  }
+
+  @Test(dataProvider = "thingsToWrite")
+  public void writeOnSingleBuffer(String dataType, Object value) {
+    int position;
+    switch (dataType) {
+      case "int":
+        position = positionToWriteInSingleBuffer(Integer.BYTES);
+        _compoundDataBuffer.putInt(position, (int) value);
+        assertEquals(_compoundDataBuffer.getInt(position), value);
+        break;
+      case "float":
+        position = positionToWriteInSingleBuffer(Float.BYTES);
+        _compoundDataBuffer.putFloat(position, (float) value);
+        assertEquals(_compoundDataBuffer.getFloat(position), value);
+        break;
+      case "long":
+        position = positionToWriteInSingleBuffer(Long.BYTES);
+        _compoundDataBuffer.putLong(position, (long) value);
+        assertEquals(_compoundDataBuffer.getLong(position), value);
+        break;
+      case "double":
+        position = positionToWriteInSingleBuffer(Double.BYTES);
+        _compoundDataBuffer.putDouble(position, (double) value);
+        assertEquals(_compoundDataBuffer.getDouble(position), value);
+        break;
+      case "char":
+        position = positionToWriteInSingleBuffer(Character.BYTES);
+        _compoundDataBuffer.putChar(position, (char) value);
+        assertEquals(_compoundDataBuffer.getChar(position), value);
+        break;
+      case "short":
+        position = positionToWriteInSingleBuffer(Short.BYTES);
+        _compoundDataBuffer.putShort(position, (short) value);
+        assertEquals(_compoundDataBuffer.getShort(position), value);
+        break;
+      case "byte":
+        position = positionToWriteInSingleBuffer(Byte.BYTES);
+        _compoundDataBuffer.putByte(position, (byte) value);
+        assertEquals(_compoundDataBuffer.getByte(position), value);
+        break;
+      case "bytes":
+        byte[] bytes = (byte[]) value;
+        position = positionToWriteInSingleBuffer(bytes.length);
+        _compoundDataBuffer.readFrom(position, bytes);
+        byte[] readBytes = new byte[bytes.length];
+        _compoundDataBuffer.copyTo(position, readBytes);
+        assertEquals(readBytes, bytes);
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported data type " + 
dataType);
+    }
+  }
+
+  @Test(dataProvider = "thingsToWrite")
+  public void writeBetweenBuffers(String dataType, Object value) {
+    int position;
+    switch (dataType) {
+      case "int":
+        position = positionToWriteTwoBuffer(Integer.BYTES);
+        _compoundDataBuffer.putInt(position, (int) value);
+        assertEquals(_compoundDataBuffer.getInt(position), value);
+        break;
+      case "long":
+        position = positionToWriteTwoBuffer(Long.BYTES);
+        _compoundDataBuffer.putLong(position, (long) value);
+        assertEquals(_compoundDataBuffer.getLong(position), value);
+        break;
+      case "float":
+        position = positionToWriteTwoBuffer(Float.BYTES);
+        _compoundDataBuffer.putFloat(position, (float) value);
+        assertEquals(_compoundDataBuffer.getFloat(position), value);
+        break;
+      case "double":
+        position = positionToWriteTwoBuffer(Double.BYTES);
+        _compoundDataBuffer.putDouble(position, (double) value);
+        assertEquals(_compoundDataBuffer.getDouble(position), value);
+        break;
+      case "char":
+        position = positionToWriteTwoBuffer(Character.BYTES);
+        _compoundDataBuffer.putChar(position, (char) value);
+        assertEquals(_compoundDataBuffer.getChar(position), value);
+        break;
+      case "short":
+        position = positionToWriteTwoBuffer(Short.BYTES);
+        _compoundDataBuffer.putShort(position, (short) value);
+        assertEquals(_compoundDataBuffer.getShort(position), value);
+        break;
+      case "byte":
+        position = positionToWriteTwoBuffer(Byte.BYTES);
+        _compoundDataBuffer.putByte(position, (byte) value);
+        assertEquals(_compoundDataBuffer.getByte(position), value);
+        break;
+      case "bytes":
+        byte[] bytes = (byte[]) value;
+        position = positionToWriteTwoBuffer(bytes.length);
+        _compoundDataBuffer.readFrom(position, bytes);
+        byte[] readBytes = new byte[bytes.length];
+        _compoundDataBuffer.copyTo(position, readBytes);
+        assertEquals(readBytes, bytes);
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported data type " + 
dataType);
+    }
+  }
+
+  @Test
+  void readWriteByteBuffer() {
+    Random r = new Random(42);
+    byte[] bytes = new byte[BUFFER_SIZE];
+
+    int position = positionToWriteTwoBuffer(bytes.length);
+
+    r.nextBytes(bytes);
+    ByteBuffer buffer = ByteBuffer.wrap(bytes);
+    _compoundDataBuffer.readFrom(position, buffer);
+    ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
+    _compoundDataBuffer.copyTo(position, readBuffer, 0, BUFFER_SIZE);
+    assertEquals(readBuffer.array(), bytes);
+  }
+
+  @Test
+  void readWriteFile()
+      throws IOException {
+    Random r = new Random(42);
+    byte[] bytes = new byte[BUFFER_SIZE];
+
+    int position = positionToWriteTwoBuffer(bytes.length);
+
+    r.nextBytes(bytes);
+    File file = Files.createTempFile("test", "data").toFile();
+    try {
+      try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, 
"rw")) {
+        randomAccessFile.setLength(BUFFER_SIZE);
+        try (BufferedOutputStream bos = new BufferedOutputStream(new 
FileOutputStream(file))) {
+          bos.write(bytes);
+        }
+      }
+      _compoundDataBuffer.readFrom(position, file, 0, BUFFER_SIZE);
+      byte[] readBytes = new byte[BUFFER_SIZE];
+      _compoundDataBuffer.copyTo(position, readBytes);
+      assertEquals(readBytes, bytes);
+    } finally {
+      file.delete();
+    }
+  }
+
+  @Test
+  void testViewTotal() {
+    Random r = new Random(42);
+    byte[] bytes1 = new byte[BUFFER_SIZE];
+    r.nextBytes(bytes1);
+
+    byte[] bytes2 = new byte[BUFFER_SIZE];
+    r.nextBytes(bytes2);
+
+    byte[] bytes3 = new byte[BUFFER_SIZE];
+    r.nextBytes(bytes3);
+
+    _compoundDataBuffer.readFrom(0, bytes1);
+    _compoundDataBuffer.readFrom(BUFFER_SIZE, bytes2);
+    _compoundDataBuffer.readFrom(BUFFER_SIZE * 2L, bytes3);
+
+    DataBuffer view = _compoundDataBuffer.view(0, BUFFER_SIZE * 3L);
+    assertEquals(view.size(), BUFFER_SIZE * 3L);
+    assertEquals(view.order(), ByteOrder.BIG_ENDIAN);
+    assertEquals(view, _compoundDataBuffer);
+  }
+
+  @Test
+  void testViewSeveralBuffers() {
+    Random r = new Random(42);
+    byte[] bytes1 = new byte[BUFFER_SIZE];
+    r.nextBytes(bytes1);
+
+    byte[] bytes2 = new byte[BUFFER_SIZE];
+    r.nextBytes(bytes2);
+
+    byte[] bytes3 = new byte[BUFFER_SIZE];
+    r.nextBytes(bytes3);
+
+    byte[] totalBytes = new byte[BUFFER_SIZE * 2];
+
+    System.arraycopy(bytes1, 1, totalBytes, 0, BUFFER_SIZE - 1);
+    System.arraycopy(bytes2, 0, totalBytes, BUFFER_SIZE - 1, BUFFER_SIZE);
+    System.arraycopy(bytes3, 0, totalBytes, BUFFER_SIZE * 2 - 1, 1);
+
+    DataBuffer expected = PinotByteBuffer.wrap(totalBytes);
+
+    // prepare the compound buffer
+    _compoundDataBuffer.readFrom(0, bytes1);
+    _compoundDataBuffer.readFrom(BUFFER_SIZE, bytes2);
+    _compoundDataBuffer.readFrom(BUFFER_SIZE * 2L, bytes3);
+
+    // apply the view
+    DataBuffer view = _compoundDataBuffer.view(1, BUFFER_SIZE * 2L + 1);
+    assertEquals(view.size(), BUFFER_SIZE * 2L);
+    assertEquals(view.order(), ByteOrder.BIG_ENDIAN);
+
+    assertEquals(view, expected);
+  }
+
+  @Test
+  void testViewOneBuffer() {
+    Random r = new Random(42);
+    byte[] bytes1 = new byte[BUFFER_SIZE];
+    r.nextBytes(bytes1);
+
+    _compoundDataBuffer.readFrom(0, bytes1);
+
+    DataBuffer expected = PinotByteBuffer.wrap(Arrays.copyOfRange(bytes1, 1, 
BUFFER_SIZE - 1));
+
+    DataBuffer view = _compoundDataBuffer.view(1, BUFFER_SIZE - 1);
+    assertEquals(view.size(), BUFFER_SIZE - 2);
+    assertEquals(view, expected);
+  }
+
+  @Test
+  void testCopyOrViewOneBuffer() {
+    Random r = new Random(42);
+    byte[] bytes1 = new byte[BUFFER_SIZE];
+    r.nextBytes(bytes1);
+
+    _compoundDataBuffer.readFrom(0, bytes1);
+
+    ByteBuffer expected = ByteBuffer.wrap(bytes1, 1, BUFFER_SIZE - 1);
+
+    ByteBuffer copyOrView = _compoundDataBuffer.copyOrView(1, BUFFER_SIZE - 1);
+    assertEquals(copyOrView, expected);
+  }
+}
diff --git 
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/DataBufferPinotInputStreamTest.java
 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/DataBufferPinotInputStreamTest.java
new file mode 100644
index 0000000000..89318e22c8
--- /dev/null
+++ 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/DataBufferPinotInputStreamTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Random;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class DataBufferPinotInputStreamTest {
+
+  ByteBuffer _byteBuffer;
+  DataBuffer _dataBuffer;
+  DataBufferPinotInputStream _dataBufferPinotInputStream;
+  private static final int BUFFER_SIZE = 32;
+
+  @BeforeMethod
+  public void setUp() {
+    Random r = new Random(42);
+
+    byte[] buffer = new byte[BUFFER_SIZE];
+    r.nextBytes(buffer);
+
+    _byteBuffer = ByteBuffer.wrap(buffer).order(ByteOrder.BIG_ENDIAN);
+    _dataBuffer = PinotByteBuffer.wrap(_byteBuffer);
+    _dataBufferPinotInputStream = new DataBufferPinotInputStream(_dataBuffer);
+  }
+
+  @Test
+  void readByteBuffer() {
+    byte[] buffer = new byte[BUFFER_SIZE];
+    ByteBuffer wrap = ByteBuffer.wrap(buffer);
+    _dataBufferPinotInputStream.read(wrap);
+
+    assertEquals(wrap, _byteBuffer);
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), BUFFER_SIZE);
+  }
+
+  @Test
+  void readByteMovesCursor() {
+    int read = _dataBufferPinotInputStream.read();
+    assertEquals(read, _byteBuffer.get(0) & 0xFF);
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 1);
+  }
+
+  @Test
+  void seekMovesCursor() {
+    _dataBufferPinotInputStream.seek(1);
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 1);
+
+    int read = _dataBufferPinotInputStream.read();
+    assertEquals(read, _byteBuffer.get(1) & 0xFF);
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 2);
+  }
+
+  @Test
+  void readAtEndReturnsMinusOne() {
+    _dataBufferPinotInputStream.seek(BUFFER_SIZE);
+    assertEquals(_dataBufferPinotInputStream.read(), -1);
+  }
+
+  @Test
+  void testSkip() {
+    _dataBufferPinotInputStream.seek(1);
+    long skip = _dataBufferPinotInputStream.skip(2);
+    assertEquals(skip, 2);
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 3);
+  }
+
+  @Test
+  void testSkipEnd() {
+    _dataBufferPinotInputStream.seek(BUFFER_SIZE - 1);
+    long skip = _dataBufferPinotInputStream.skip(2);
+    assertEquals(skip, 1);
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), BUFFER_SIZE);
+  }
+
+  @Test
+  void testAvailable() {
+    assertEquals(_dataBufferPinotInputStream.available(), BUFFER_SIZE);
+    _dataBufferPinotInputStream.seek(1);
+    assertEquals(_dataBufferPinotInputStream.available(), BUFFER_SIZE - 1);
+  }
+
+  @Test
+  void testReadInt()
+      throws EOFException {
+    int read = _dataBufferPinotInputStream.readInt();
+
+    assertEquals(read, _byteBuffer.getInt(0));
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 
Integer.BYTES);
+  }
+
+  @Test
+  void testReadLong()
+      throws EOFException {
+    long read = _dataBufferPinotInputStream.readLong();
+
+    assertEquals(read, _byteBuffer.getLong(0));
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), Long.BYTES);
+  }
+
+  @Test
+  void testReadShort()
+      throws EOFException {
+    short read = _dataBufferPinotInputStream.readShort();
+
+    assertEquals(read, _byteBuffer.getShort(0));
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), Short.BYTES);
+  }
+
+  @Test
+  void testReadBoolean()
+      throws EOFException {
+    boolean read = _dataBufferPinotInputStream.readBoolean();
+
+    assertEquals(read, _byteBuffer.get(0) != 0);
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 1);
+  }
+
+  @Test
+  void testReadByte()
+      throws EOFException {
+    int read = _dataBufferPinotInputStream.readByte();
+
+    assertEquals(read, _byteBuffer.get(0) & 0xFF);
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 1);
+  }
+
+  @Test
+  void testReadUnsignedByte()
+      throws EOFException {
+    int read = _dataBufferPinotInputStream.readUnsignedByte();
+
+    assertEquals(read, _byteBuffer.get(0) & 0xFF);
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 1);
+  }
+
+  @Test
+  void testReadUnsignedShort()
+      throws EOFException {
+    int read = _dataBufferPinotInputStream.readUnsignedShort();
+
+    assertEquals(read, _byteBuffer.getShort(0) & 0xFFFF);
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), Short.BYTES);
+  }
+
+  @Test
+  void testReadFloat()
+      throws EOFException {
+    float read = _dataBufferPinotInputStream.readFloat();
+
+    assertEquals(read, _byteBuffer.getFloat(0));
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), Float.BYTES);
+  }
+
+  @Test
+  void testReadDouble()
+      throws EOFException {
+    double read = _dataBufferPinotInputStream.readDouble();
+
+    assertEquals(read, _byteBuffer.getDouble(0));
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), Double.BYTES);
+  }
+
+  @Test
+  void testReadChar()
+      throws EOFException {
+    char read = _dataBufferPinotInputStream.readChar();
+
+    assertEquals(read, _byteBuffer.getChar(0));
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), 
Character.BYTES);
+  }
+
+  @Test
+  void testReadFully()
+      throws IOException {
+    byte[] buffer = new byte[BUFFER_SIZE];
+    _dataBufferPinotInputStream.readFully(buffer);
+
+    for (int i = 0; i < BUFFER_SIZE; i++) {
+      assertEquals(buffer[i], _byteBuffer.get(i));
+    }
+    assertEquals(_dataBufferPinotInputStream.getCurrentOffset(), BUFFER_SIZE);
+  }
+}
diff --git 
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PagedPinotOutputStreamTest.java
 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PagedPinotOutputStreamTest.java
new file mode 100644
index 0000000000..f496511f5f
--- /dev/null
+++ 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PagedPinotOutputStreamTest.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Random;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class PagedPinotOutputStreamTest {
+
+  int _pageSize;
+  PagedPinotOutputStream _pagedPinotOutputStream;
+
+  @BeforeMethod
+  public void setUp() {
+    _pagedPinotOutputStream = PagedPinotOutputStream.createHeap();
+    _pageSize = _pagedPinotOutputStream.getPageSize();
+  }
+
+  @Test
+  void writeByteInPage()
+      throws IOException {
+    byte b = 42;
+    _pagedPinotOutputStream.write(b);
+    assertEquals(_pagedPinotOutputStream.getCurrentOffset(), 1);
+
+    byte read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, 
false).getByte(0);
+    assertEquals(read, b);
+  }
+
+  @Test
+  void writeByteAcrossPage()
+      throws IOException {
+    byte b = 42;
+
+    _pagedPinotOutputStream.seek(_pageSize);
+    _pagedPinotOutputStream.write(b);
+
+    assertEquals(_pagedPinotOutputStream.getPages().length, 2);
+
+    byte read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, 
false).getByte(_pageSize);
+    assertEquals(read, b);
+  }
+
+  @Test
+  void writeIntInPage()
+      throws IOException {
+    int i = 42;
+    _pagedPinotOutputStream.writeInt(i);
+    assertEquals(_pagedPinotOutputStream.getCurrentOffset(), Integer.BYTES);
+
+    int read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, 
false).getInt(0);
+    assertEquals(read, i);
+  }
+
+  @Test
+  void writeIntAcrossPage()
+      throws IOException {
+    int i = 42;
+
+    _pagedPinotOutputStream.seek(_pageSize - Integer.BYTES + 1);
+    _pagedPinotOutputStream.writeInt(i);
+
+    assertEquals(_pagedPinotOutputStream.getPages().length, 2);
+
+    int read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, 
false).getInt(_pageSize - Integer.BYTES + 1);
+    assertEquals(read, i);
+  }
+
+  @Test
+  void writeLongInPage()
+      throws IOException {
+    long l = 42;
+    _pagedPinotOutputStream.writeLong(l);
+    assertEquals(_pagedPinotOutputStream.getCurrentOffset(), Long.BYTES);
+
+    long read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, 
false).getLong(0);
+    assertEquals(read, l);
+  }
+
+  @Test
+  void writeLongAcrossPage()
+      throws IOException {
+    long l = 42;
+
+    _pagedPinotOutputStream.seek(_pageSize - Long.BYTES + 1);
+    _pagedPinotOutputStream.writeLong(l);
+
+    assertEquals(_pagedPinotOutputStream.getPages().length, 2);
+
+    long read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, 
false).getLong(_pageSize - Long.BYTES + 1);
+    assertEquals(read, l);
+  }
+
+  @Test
+  void writeShortInPage()
+      throws IOException {
+    short s = 42;
+    _pagedPinotOutputStream.writeShort(s);
+    assertEquals(_pagedPinotOutputStream.getCurrentOffset(), Short.BYTES);
+
+    short read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, 
false).getShort(0);
+    assertEquals(read, s);
+  }
+
+  @Test
+  void writeShortAcrossPage()
+      throws IOException {
+    short s = 42;
+
+    _pagedPinotOutputStream.seek(_pageSize - Short.BYTES + 1);
+    _pagedPinotOutputStream.writeShort(s);
+
+    assertEquals(_pagedPinotOutputStream.getPages().length, 2);
+
+    short read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, 
false).getShort(_pageSize - Short.BYTES + 1);
+    assertEquals(read, s);
+  }
+
+  @Test
+  void seekIntoPageStart()
+      throws IOException {
+    int i = 42;
+    _pagedPinotOutputStream.seek(_pageSize);
+    assertEquals(_pagedPinotOutputStream.getCurrentOffset(), _pageSize);
+
+    _pagedPinotOutputStream.writeInt(i);
+
+    int read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, 
false).getInt(_pageSize);
+    assertEquals(read, i);
+  }
+
+  @Test
+  void seekIntoNegative() {
+    assertThrows(IllegalArgumentException.class, () -> 
_pagedPinotOutputStream.seek(-1));
+  }
+
+  @Test
+  void seekIntoNotExistentPage()
+      throws IOException {
+    int i = 42;
+    long newPosition = _pageSize * 10L + 2;
+    _pagedPinotOutputStream.seek(newPosition);
+
+    _pagedPinotOutputStream.writeInt(i);
+
+    int read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, 
false).getInt(newPosition);
+    assertEquals(read, i);
+  }
+
+  @Test
+  void seekIntoStart()
+      throws IOException {
+    int i = 42;
+    _pagedPinotOutputStream.seek(123);
+    _pagedPinotOutputStream.seek(0);
+
+    _pagedPinotOutputStream.writeInt(i);
+
+    int read = _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, 
false).getInt(0);
+    assertEquals(read, i);
+  }
+
+  @Test
+  void writeLargeByteArray()
+      throws IOException {
+    Random r = new Random(42);
+    byte[] bytes = new byte[_pageSize + 1];
+    r.nextBytes(bytes);
+
+    _pagedPinotOutputStream.write(bytes);
+
+    byte[] read = new byte[_pageSize + 1];
+    _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, false).copyTo(0, 
read);
+
+    assertEquals(read, bytes);
+    assertEquals(_pagedPinotOutputStream.getPages().length, 2);
+  }
+
+  @Test
+  void writeSmallByteArray()
+      throws IOException {
+    Random r = new Random(42);
+    byte[] bytes = new byte[_pageSize / 2];
+    r.nextBytes(bytes);
+
+    _pagedPinotOutputStream.write(bytes);
+
+    byte[] read = new byte[_pageSize / 2];
+    _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, false).copyTo(0, 
read);
+
+    assertEquals(read, bytes);
+    assertEquals(_pagedPinotOutputStream.getPages().length, 1);
+  }
+
+  @Test
+  void writeLargeDataInput()
+      throws IOException {
+    Random r = new Random(42);
+    byte[] bytes = new byte[_pageSize + 1];
+    r.nextBytes(bytes);
+
+    _pagedPinotOutputStream.write(PinotByteBuffer.wrap(bytes));
+
+    byte[] read = new byte[_pageSize + 1];
+    _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, false).copyTo(0, 
read);
+
+    assertEquals(read, bytes);
+    assertEquals(_pagedPinotOutputStream.getPages().length, 2);
+  }
+
+  @Test
+  void writeSmallDataInput()
+      throws IOException {
+    Random r = new Random(42);
+    byte[] bytes = new byte[_pageSize / 2];
+    r.nextBytes(bytes);
+
+    _pagedPinotOutputStream.write(PinotByteBuffer.wrap(bytes));
+
+    byte[] read = new byte[_pageSize / 2];
+    _pagedPinotOutputStream.asBuffer(ByteOrder.BIG_ENDIAN, false).copyTo(0, 
read);
+
+    assertEquals(read, bytes);
+    assertEquals(_pagedPinotOutputStream.getPages().length, 1);
+  }
+
+  @Test
+  void testGetPagesEmpty() {
+    ByteBuffer[] pages = _pagedPinotOutputStream.getPages();
+    assertEquals(pages.length, 0);
+  }
+
+  @Test
+  void testGetPagesSingleIntWrite()
+      throws IOException {
+    _pagedPinotOutputStream.writeInt(42);
+
+    ByteBuffer[] pages = _pagedPinotOutputStream.getPages();
+    assertEquals(pages.length, 1);
+    assertEquals(pages[0].position(), 0);
+    assertEquals(pages[0].limit(), 4);
+
+    assertEquals(pages[0].getInt(0), 42);
+  }
+
+  @Test
+  void testGetPagesAfterSeekingInTheMiddleOfAPage() {
+    _pagedPinotOutputStream.seek(_pageSize * 2L + 1);
+
+    ByteBuffer[] pages = _pagedPinotOutputStream.getPages();
+    assertEquals(pages.length, 3);
+
+    assertEquals(pages[0].position(), 0);
+    assertEquals(pages[0].limit(), _pageSize);
+
+    assertEquals(pages[1].position(), 0);
+    assertEquals(pages[1].limit(), _pageSize);
+
+    assertEquals(pages[2].position(), 0);
+    assertEquals(pages[2].limit(), 1);
+  }
+
+  @Test
+  void testGetPagesAfterSeekingInTheStartOfAPage() {
+    _pagedPinotOutputStream.seek(_pageSize * 2L);
+
+    ByteBuffer[] pages = _pagedPinotOutputStream.getPages();
+    assertEquals(pages.length, 2);
+
+    assertEquals(pages[0].position(), 0);
+    assertEquals(pages[0].limit(), _pageSize);
+
+    assertEquals(pages[1].position(), 0);
+    assertEquals(pages[1].limit(), _pageSize);
+  }
+
+  @Test
+  void testGetPagesUsesWrittenInsteadOfOffset() {
+    _pagedPinotOutputStream.seek(_pageSize * 2L);
+    _pagedPinotOutputStream.seek(1);
+
+    ByteBuffer[] pages = _pagedPinotOutputStream.getPages();
+    assertEquals(pages.length, 2);
+
+    assertEquals(pages[0].position(), 0);
+    assertEquals(pages[0].limit(), _pageSize);
+
+    assertEquals(pages[1].position(), 0);
+    assertEquals(pages[1].limit(), _pageSize);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to