This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 687770ee3e1 [Timeseries-Storage] add Delta and DeltaDelta storage 
encoding (#15258)
687770ee3e1 is described below

commit 687770ee3e1b1042e954bc8f7b3e9b53997ab64c
Author: Qiaochu Liu <[email protected]>
AuthorDate: Thu Nov 20 11:21:18 2025 -0800

    [Timeseries-Storage] add Delta and DeltaDelta storage encoding (#15258)
    
    * [Timeseries-Storage] add Delta and DeltaDelta storage encoding
    
    * Add support for Int
    
    * fix tests
    
    * fix tests
---
 .../io/compression/ChunkCompressorFactory.java     |  14 +-
 .../local/io/compression/DeltaCompressor.java      | 199 ++++++++++++++++++
 .../local/io/compression/DeltaDecompressor.java    | 151 ++++++++++++++
 .../local/io/compression/DeltaDeltaCompressor.java | 231 +++++++++++++++++++++
 .../io/compression/DeltaDeltaDecompressor.java     | 169 +++++++++++++++
 .../forward/BaseChunkForwardIndexReader.java       |  10 +-
 .../local/io/compression/DeltaCompressionTest.java | 228 ++++++++++++++++++++
 .../io/compression/DeltaDeltaCompressionTest.java  | 200 ++++++++++++++++++
 .../impl/VarByteChunkSVForwardIndexWriterTest.java |   4 +-
 .../MultiValueFixedByteRawIndexCreatorTest.java    |   1 +
 .../MultiValueVarByteRawIndexCreatorTest.java      |   1 +
 .../forward/FixedByteChunkSVForwardIndexTest.java  |   1 +
 .../spi/compression/ChunkCompressionType.java      |   2 +-
 .../segment/spi/index/ForwardIndexConfig.java      |  12 ++
 .../apache/pinot/spi/config/table/FieldConfig.java |   5 +-
 15 files changed, 1223 insertions(+), 5 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
index 15def2f733b..33ef239583a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
@@ -71,6 +71,12 @@ public class ChunkCompressorFactory {
       case GZIP:
         return new GzipCompressor();
 
+      case DELTA:
+        return DeltaCompressor.INSTANCE;
+
+      case DELTADELTA:
+        return DeltaDeltaCompressor.INSTANCE;
+
       default:
         throw new IllegalArgumentException("Illegal compressor name " + 
compressionType);
     }
@@ -102,8 +108,14 @@ public class ChunkCompressorFactory {
       case GZIP:
         return new GzipDecompressor();
 
+      case DELTA:
+        return DeltaDecompressor.INSTANCE;
+
+      case DELTADELTA:
+        return DeltaDeltaDecompressor.INSTANCE;
+
       default:
-        throw new IllegalArgumentException("Illegal compressor name " + 
compressionType);
+        throw new IllegalArgumentException("Illegal decompressor name " + 
compressionType);
     }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaCompressor.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaCompressor.java
new file mode 100644
index 00000000000..03b8b8fcf50
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaCompressor.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+
+
+/**
+ * Implementation of {@link ChunkCompressor} using delta compression with LZ4.
+ * The delta values are further compressed using LZ4.
+ */
+class DeltaCompressor implements ChunkCompressor {
+
+  static final DeltaCompressor INSTANCE = new DeltaCompressor();
+  static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance();
+  private static final byte INT_FLAG = 0;
+  private static final byte LONG_FLAG = 1;
+
+  private DeltaCompressor() {
+  }
+
+  @Override
+  public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
+      throws IOException {
+    // Store original position to calculate compressed size
+    int outStartPosition = outCompressed.position();
+
+    int remaining = inUncompressed.remaining();
+    if (remaining % Integer.BYTES != 0) {
+      throw new IOException("Invalid input size: must be multiple of 4 bytes 
for INT or 8 bytes for LONG");
+    }
+    if (remaining % Long.BYTES != 0) {
+      outCompressed.put(INT_FLAG);
+      return compressForInt(inUncompressed, outCompressed, outStartPosition);
+    }
+    outCompressed.put(LONG_FLAG);
+    return compressForLong(inUncompressed, outCompressed, outStartPosition);
+  }
+
+  private int compressForInt(ByteBuffer inUncompressed, ByteBuffer 
outCompressed, int outStartPosition)
+      throws IOException {
+    // Count the number of long values to be compressed.
+    int numIntegers = inUncompressed.remaining() / Integer.BYTES;
+    if (numIntegers == 0) {
+      outCompressed.putInt(0);
+      outCompressed.flip();
+      return 5; // 1 byte flag + 4 bytes for numIntegers
+    }
+
+    // Store the number of integer values at the start
+    outCompressed.putInt(numIntegers);
+
+    // Store the first value as-is
+    int prevValue = inUncompressed.getInt();
+    outCompressed.putInt(prevValue);
+
+    if (numIntegers == 1) {
+      outCompressed.flip();
+      return outCompressed.limit() - outStartPosition;
+    }
+
+    // Create temporary buffer for delta values before LZ4 compression
+    ByteBuffer deltaBuffer = ByteBuffer.allocate((numIntegers - 1) * 
Integer.BYTES);
+
+    // Calculate deltas
+    for (int i = 1; i < numIntegers; i++) {
+      int currentValue = inUncompressed.getInt();
+      int delta = currentValue - prevValue;
+      deltaBuffer.putInt(delta);
+      prevValue = currentValue;
+    }
+
+    // Prepare the delta buffer for reading
+    deltaBuffer.flip();
+
+    // Reserve space for compressed size
+    outCompressed.position(outCompressed.position() + Integer.BYTES);
+    int compressedStart = outCompressed.position();
+
+    // Compress delta values using LZ4
+    LZ4_FACTORY.fastCompressor().compress(deltaBuffer, outCompressed);
+
+    // Record compressed size
+    int compressedSize = outCompressed.position() - compressedStart;
+    outCompressed.putInt(compressedStart - Integer.BYTES, compressedSize);
+
+    // Make buffer ready for reading
+    outCompressed.flip();
+    return outCompressed.limit() - outStartPosition;
+  }
+
+  private int compressForLong(ByteBuffer inUncompressed, ByteBuffer 
outCompressed, int outStartPosition)
+      throws IOException {
+    // Count the number of long values to be compressed.
+    int numLongs = inUncompressed.remaining() / Long.BYTES;
+    if (numLongs == 0) {
+      outCompressed.putInt(0);
+      outCompressed.flip();
+      return 5; // 1 byte flag + 4 bytes for numLongs
+    }
+
+    // Store the number of long values at the start
+    outCompressed.putInt(numLongs);
+
+    // Store the first value as-is
+    long prevValue = inUncompressed.getLong();
+    outCompressed.putLong(prevValue);
+
+    if (numLongs == 1) {
+      outCompressed.flip();
+      return outCompressed.limit() - outStartPosition;
+    }
+
+    // Create temporary buffer for delta values before LZ4 compression
+    ByteBuffer deltaBuffer = ByteBuffer.allocate((numLongs - 1) * Long.BYTES);
+
+    // Calculate deltas
+    for (int i = 1; i < numLongs; i++) {
+      long currentValue = inUncompressed.getLong();
+      long delta = currentValue - prevValue;
+      deltaBuffer.putLong(delta);
+      prevValue = currentValue;
+    }
+
+    // Prepare the delta buffer for reading
+    deltaBuffer.flip();
+
+    // Reserve space for compressed size
+    outCompressed.position(outCompressed.position() + Integer.BYTES);
+    int compressedStart = outCompressed.position();
+
+    // Compress delta values using LZ4
+    LZ4_FACTORY.fastCompressor().compress(deltaBuffer, outCompressed);
+
+    // Record compressed size
+    int compressedSize = outCompressed.position() - compressedStart;
+    outCompressed.putInt(compressedStart - Integer.BYTES, compressedSize);
+
+    // Make buffer ready for reading
+    outCompressed.flip();
+    return outCompressed.limit() - outStartPosition;
+  }
+
+  @Override
+  public int maxCompressedSize(int uncompressedSize) {
+    // Add 1 byte for type flag
+    int baseSize = 1;
+    if (uncompressedSize % Integer.BYTES != 0) {
+      throw new IllegalArgumentException("Invalid input size: must be multiple 
of 4 bytes for INT or 8 bytes for LONG");
+    }
+    if (uncompressedSize % Long.BYTES != 0) {
+      int numIntegers = uncompressedSize / Integer.BYTES;
+      if (numIntegers == 0) {
+        return baseSize + 4; // flag + numIntegers
+      }
+      if (numIntegers == 1) {
+        return baseSize + 8; // flag + numIntegers + one int value
+      }
+      int deltaSize = (numIntegers - 1) * Long.BYTES;
+      return baseSize + 12 + LZ4_FACTORY.fastCompressor()
+          .maxCompressedLength(deltaSize); // flag + numIntegers + first int 
value + compressed size + compressed data
+    }
+    int numLongs = uncompressedSize / Long.BYTES;
+    if (numLongs == 0) {
+      return baseSize + 4; // flag + numLongs
+    }
+    if (numLongs == 1) {
+      return baseSize + 12; // flag + numLongs + one long value
+    }
+    int deltaSize = (numLongs - 1) * Long.BYTES;
+    return baseSize + 16 + LZ4_FACTORY.fastCompressor()
+        .maxCompressedLength(deltaSize); // flag + numLongs + first value + 
compressed size + compressed data
+  }
+
+  @Override
+  public ChunkCompressionType compressionType() {
+    return ChunkCompressionType.DELTA;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDecompressor.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDecompressor.java
new file mode 100644
index 00000000000..bc873e83320
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDecompressor.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+
+
+/**
+ * Implementation of {@link ChunkDecompressor} for delta compression with LZ4.
+ * This decompressor reconstructs the original integer sequence from LZ4 
compressed delta encoded values.
+ */
+class DeltaDecompressor implements ChunkDecompressor {
+
+  static final DeltaDecompressor INSTANCE = new DeltaDecompressor();
+  private static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance();
+  private static final byte LONG_FLAG = 1;
+
+  private DeltaDecompressor() {
+  }
+
+  @Override
+  public int decompress(ByteBuffer compressedInput, ByteBuffer 
decompressedOutput)
+      throws IOException {
+
+    // Read and validate type flag (only LONG supported), TODO: support INT
+    byte flag = compressedInput.get();
+    if (flag != LONG_FLAG) {
+      return decompressForInt(compressedInput, decompressedOutput);
+    }
+    return decompressForLong(compressedInput, decompressedOutput);
+  }
+
+  private int decompressForInt(ByteBuffer compressedInput, ByteBuffer 
decompressedOutput)
+      throws IOException {
+    // Get number of longs
+    int numIntegers = compressedInput.getInt();
+    if (numIntegers == 0) {
+      decompressedOutput.flip();
+      return 0;
+    }
+
+    // Get first value
+    int prevValue = compressedInput.getInt();
+    decompressedOutput.putInt(prevValue);
+
+    if (numIntegers == 1) {
+      decompressedOutput.flip();
+      return Integer.BYTES;
+    }
+
+    // Get size of compressed delta values
+    int compressedSize = compressedInput.getInt();
+
+    // Create temporary buffer for decompressed deltas
+    ByteBuffer deltaBuffer = ByteBuffer.allocate((numIntegers - 1) * 
Integer.BYTES);
+
+    // Get compressed delta values position
+    ByteBuffer compressedDeltas = compressedInput.slice();
+    compressedDeltas.limit(compressedSize);
+
+    // Decompress delta values using LZ4
+    LZ4_FACTORY.safeDecompressor().decompress(compressedDeltas, deltaBuffer);
+    deltaBuffer.flip();
+
+    // Reconstruct remaining values
+    for (int i = 1; i < numIntegers; i++) {
+      int delta = deltaBuffer.getInt();
+      int currentValue = prevValue + delta;
+      decompressedOutput.putInt(currentValue);
+      prevValue = currentValue;
+    }
+
+    // Make buffer ready for reading
+    decompressedOutput.flip();
+    return numIntegers * Integer.BYTES;
+  }
+
+  private int decompressForLong(ByteBuffer compressedInput, ByteBuffer 
decompressedOutput)
+      throws IOException {
+    // Get number of longs
+    int numLongs = compressedInput.getInt();
+    if (numLongs == 0) {
+      decompressedOutput.flip();
+      return 0;
+    }
+
+    // Get first value
+    long prevValue = compressedInput.getLong();
+    decompressedOutput.putLong(prevValue);
+
+    if (numLongs == 1) {
+      decompressedOutput.flip();
+      return Long.BYTES;
+    }
+
+    // Get size of compressed delta values
+    int compressedSize = compressedInput.getInt();
+
+    // Create temporary buffer for decompressed deltas
+    ByteBuffer deltaBuffer = ByteBuffer.allocate((numLongs - 1) * Long.BYTES);
+
+    // Get compressed delta values position
+    ByteBuffer compressedDeltas = compressedInput.slice();
+    compressedDeltas.limit(compressedSize);
+
+    // Decompress delta values using LZ4
+    LZ4_FACTORY.safeDecompressor().decompress(compressedDeltas, deltaBuffer);
+    deltaBuffer.flip();
+
+    // Reconstruct remaining values
+    for (int i = 1; i < numLongs; i++) {
+      long delta = deltaBuffer.getLong();
+      long currentValue = prevValue + delta;
+      decompressedOutput.putLong(currentValue);
+      prevValue = currentValue;
+    }
+
+    // Make buffer ready for reading
+    decompressedOutput.flip();
+    return numLongs * Long.BYTES;
+  }
+
+  @Override
+  public int decompressedLength(ByteBuffer compressedInput) {
+    byte flag = compressedInput.get(compressedInput.position());
+    int numValues = compressedInput.getInt(compressedInput.position() + 1);
+    if (flag != LONG_FLAG) {
+      return numValues * Integer.BYTES;
+    }
+    return numValues * Long.BYTES;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaCompressor.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaCompressor.java
new file mode 100644
index 00000000000..394772ac883
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaCompressor.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+
+
+/**
+ * A Delta of delta implementation of {@link ChunkCompressor}, that simply 
returns the input uncompressed data
+ * with performing delta of delta encoding. This is useful in cases where cost 
of de-compression out-weighs benefit of
+ * compression.
+ */
+class DeltaDeltaCompressor implements ChunkCompressor {
+
+  static final DeltaDeltaCompressor INSTANCE = new DeltaDeltaCompressor();
+  static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance();
+  private static final byte INT_FLAG = 0;
+  private static final byte LONG_FLAG = 1;
+
+  private DeltaDeltaCompressor() {
+  }
+
+  /**
+   * The compression works by:
+   * (1) Storing the first value as-is
+   * (2) Computing and storing the first delta (difference between second and 
first value)
+   * (3) For all subsequent values, storing the difference between consecutive 
deltas (delta of delta)
+   * (4) During decompression, the process is reversed to reconstruct the 
original values
+   *
+   * The following scenarios data will be benefited from delta of delta 
compression
+   * The data is sorted
+   * The differences between consecutive values are relatively constant
+   * The data consists of integers (it's specifically designed for integer 
sequences)
+   * */
+  @Override
+  public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
+      throws IOException {
+    // Store original position to calculate compressed size
+    int outStartPosition = outCompressed.position();
+
+    int remaining = inUncompressed.remaining();
+    if (remaining % Integer.BYTES != 0) {
+      throw new IOException("Invalid input size: must be multiple of 4 bytes 
for INT or 8 bytes for LONG");
+    }
+    if (remaining % Long.BYTES != 0) {
+      return compressForInt(inUncompressed, outCompressed, outStartPosition);
+    }
+    return compressForLong(inUncompressed, outCompressed, outStartPosition);
+  }
+
+  private int compressForInt(ByteBuffer inUncompressed, ByteBuffer 
outCompressed, int startPosition)
+      throws IOException {
+    outCompressed.put(INT_FLAG);
+    // Get number of integers to compress
+    int numIntegers = inUncompressed.remaining() / Integer.BYTES;
+    if (numIntegers == 0) {
+      outCompressed.putInt(0);
+      outCompressed.flip();
+      return 5; // 1 byte flag + 4 bytes for numIntegers
+    }
+
+    // Store number of Integers at the start
+    outCompressed.putInt(numIntegers);
+
+    // Store first value as-is
+    int prevValue = inUncompressed.getInt();
+    outCompressed.putInt(prevValue);
+
+    if (numIntegers == 1) {
+      outCompressed.flip();
+      return outCompressed.limit() - startPosition;
+    }
+
+    // Create temporary buffer for delta values before LZ4 compression
+    ByteBuffer deltaBuffer = ByteBuffer.allocate((numIntegers - 1) * 
Integer.BYTES);
+
+    // Store first delta
+    int prevDelta = inUncompressed.getInt() - prevValue;
+    deltaBuffer.putInt(prevDelta);
+    prevValue += prevDelta;
+
+    // Calculate remaining deltas
+    for (int i = 2; i < numIntegers; i++) {
+      int currentValue = inUncompressed.getInt();
+      int currentDelta = currentValue - prevValue;
+      int deltaOfDelta = currentDelta - prevDelta;
+
+      deltaBuffer.putInt(deltaOfDelta);
+
+      prevValue = currentValue;
+      prevDelta = currentDelta;
+    }
+
+    // Prepare delta buffer for reading
+    deltaBuffer.flip();
+
+    // Reserve space for compressed size
+    outCompressed.position(outCompressed.position() + Integer.BYTES);
+    int compressedStart = outCompressed.position();
+
+    // Compress delta values using LZ4
+    LZ4_FACTORY.fastCompressor().compress(deltaBuffer, outCompressed);
+
+    // Record compressed size
+    int compressedSize = outCompressed.position() - compressedStart;
+    outCompressed.putInt(compressedStart - Integer.BYTES, compressedSize);
+
+    // Make buffer ready for reading
+    outCompressed.flip();
+    return outCompressed.limit() - startPosition;
+  }
+
+  private int compressForLong(ByteBuffer inUncompressed, ByteBuffer 
outCompressed, int startPosition)
+      throws IOException {
+    outCompressed.put(LONG_FLAG);
+    // Get number of longs to compress
+    int numLongs = inUncompressed.remaining() / Long.BYTES;
+    if (numLongs == 0) {
+      outCompressed.putInt(0);
+      outCompressed.flip();
+      return 5; // 1 byte flag + 4 bytes for numLongs
+    }
+
+    // Store number of longs at the start
+    outCompressed.putInt(numLongs);
+
+    // Store first value as-is
+    long prevValue = inUncompressed.getLong();
+    outCompressed.putLong(prevValue);
+
+    if (numLongs == 1) {
+      outCompressed.flip();
+      return outCompressed.limit() - startPosition;
+    }
+
+    // Create temporary buffer for delta values before LZ4 compression
+    ByteBuffer deltaBuffer = ByteBuffer.allocate((numLongs - 1) * Long.BYTES);
+
+    // Store first delta
+    long prevDelta = inUncompressed.getLong() - prevValue;
+    deltaBuffer.putLong(prevDelta);
+    prevValue += prevDelta;
+
+    // Calculate remaining deltas
+    for (int i = 2; i < numLongs; i++) {
+      long currentValue = inUncompressed.getLong();
+      long currentDelta = currentValue - prevValue;
+      long deltaOfDelta = currentDelta - prevDelta;
+
+      deltaBuffer.putLong(deltaOfDelta);
+
+      prevValue = currentValue;
+      prevDelta = currentDelta;
+    }
+
+    // Prepare delta buffer for reading
+    deltaBuffer.flip();
+
+    // Reserve space for compressed size
+    outCompressed.position(outCompressed.position() + Integer.BYTES);
+    int compressedStart = outCompressed.position();
+
+    // Compress delta values using LZ4
+    LZ4_FACTORY.fastCompressor().compress(deltaBuffer, outCompressed);
+
+    // Record compressed size
+    int compressedSize = outCompressed.position() - compressedStart;
+    outCompressed.putInt(compressedStart - Integer.BYTES, compressedSize);
+
+    // Make buffer ready for reading
+    outCompressed.flip();
+    return outCompressed.limit() - startPosition;
+  }
+
+  @Override
+  public int maxCompressedSize(int uncompressedSize) {
+    // Add 1 byte for int or long flag
+    int flagSize = 1;
+
+    if (uncompressedSize % Integer.BYTES != 0) {
+      throw new IllegalArgumentException("Invalid input size: must be multiple 
of 4 bytes for INT or 8 bytes for LONG");
+    }
+    if (uncompressedSize % Long.BYTES != 0) {
+      int numIntegers = uncompressedSize / Integer.BYTES;
+      if (numIntegers == 0) {
+        return flagSize + 4; // flag + num of Integers
+      }
+      if (numIntegers == 1) {
+        return flagSize + 8; // flag + num of Integers + one integers value
+      }
+      int deltaSize = (numIntegers - 1) * Integer.BYTES;
+      // flag + num of Longs + first value + compressed size + compressed delta
+      return flagSize + 12 + 
LZ4_FACTORY.fastCompressor().maxCompressedLength(deltaSize);
+    }
+    int numLongs = uncompressedSize / Long.BYTES;
+    if (numLongs == 0) {
+      return flagSize + 4; // flag + num of Longs
+    }
+    if (numLongs == 1) {
+      return flagSize + 12; // flag + num of Longs + one long value
+    }
+    int deltaSize = (numLongs - 1) * Long.BYTES;
+    // flag + num of Longs + first value + compressed size + compressed delta
+    return flagSize + 16 + 
LZ4_FACTORY.fastCompressor().maxCompressedLength(deltaSize);
+  }
+
+  @Override
+  public ChunkCompressionType compressionType() {
+    return ChunkCompressionType.DELTADELTA;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaDecompressor.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaDecompressor.java
new file mode 100644
index 00000000000..3016a8c6c77
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaDecompressor.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+
+
+/**
+ * A pass-through implementation of {@link ChunkDecompressor}, that simply 
returns the input data without
+ * performing any de-compression. This is useful for cases where cost of 
de-compression out-weighs the benefits
+ * of compression.
+ */
+class DeltaDeltaDecompressor implements ChunkDecompressor {
+
+  static final DeltaDeltaDecompressor INSTANCE = new DeltaDeltaDecompressor();
+  static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance();
+  private static final byte LONG_FLAG = 1;
+
+  private DeltaDeltaDecompressor() {
+  }
+
+  @Override
+  public int decompress(ByteBuffer compressedInput, ByteBuffer 
decompressedOutput)
+      throws IOException {
+    // Read and validate type flag
+    byte flag = compressedInput.get();
+    if (flag != LONG_FLAG) {
+      return decompressForInt(compressedInput, decompressedOutput);
+    }
+    return decompressForLong(compressedInput, decompressedOutput);
+  }
+
+  private int decompressForInt(ByteBuffer compressedInput, ByteBuffer 
decompressedOutput) {
+    // Get number of integers
+    int numIntegers = compressedInput.getInt();
+    if (numIntegers == 0) {
+      decompressedOutput.flip();
+      return 0;
+    }
+
+    // Get first value
+    int prevValue = compressedInput.getInt();
+    decompressedOutput.putInt(prevValue);
+
+    if (numIntegers == 1) {
+      decompressedOutput.flip();
+      return Integer.BYTES;
+    }
+
+    // Get size of compressed delta values
+    int compressedSize = compressedInput.getInt();
+
+    // Create temporary buffer for decompressed deltas
+    ByteBuffer deltaBuffer = ByteBuffer.allocate((numIntegers - 1) * 
Integer.BYTES);
+
+    // Get compressed delta values position
+    ByteBuffer compressedDeltas = compressedInput.slice();
+    compressedDeltas.limit(compressedSize);
+
+    // Decompress delta values using LZ4
+    LZ4_FACTORY.safeDecompressor().decompress(compressedDeltas, deltaBuffer);
+    deltaBuffer.flip();
+
+    // Get first delta
+    int prevDelta = deltaBuffer.getInt();
+    int currentValue = prevValue + prevDelta;
+    decompressedOutput.putInt(currentValue);
+    prevValue = currentValue;
+
+    // Decompress remaining values
+    for (int i = 2; i < numIntegers; i++) {
+      int deltaOfDelta = deltaBuffer.getInt();
+      int currentDelta = prevDelta + deltaOfDelta;
+      currentValue = prevValue + currentDelta;
+
+      decompressedOutput.putInt(currentValue);
+
+      prevValue = currentValue;
+      prevDelta = currentDelta;
+    }
+
+    // Make buffer ready for reading
+    decompressedOutput.flip();
+    return numIntegers * Integer.BYTES;
+  }
+
+  private int decompressForLong(ByteBuffer compressedInput, ByteBuffer 
decompressedOutput) {
+    // Get number of integers
+    int numLongs = compressedInput.getInt();
+    if (numLongs == 0) {
+      decompressedOutput.flip();
+      return 0;
+    }
+
+    // Get first value
+    long prevValue = compressedInput.getLong();
+    decompressedOutput.putLong(prevValue);
+
+    if (numLongs == 1) {
+      decompressedOutput.flip();
+      return Long.BYTES;
+    }
+
+    // Get size of compressed delta values
+    int compressedSize = compressedInput.getInt();
+
+    // Create temporary buffer for decompressed deltas
+    ByteBuffer deltaBuffer = ByteBuffer.allocate((numLongs - 1) * Long.BYTES);
+
+    // Get compressed delta values position
+    ByteBuffer compressedDeltas = compressedInput.slice();
+    compressedDeltas.limit(compressedSize);
+
+    // Decompress delta values using LZ4
+    LZ4_FACTORY.safeDecompressor().decompress(compressedDeltas, deltaBuffer);
+    deltaBuffer.flip();
+
+    // Get first delta
+    long prevDelta = deltaBuffer.getLong();
+    long currentValue = prevValue + prevDelta;
+    decompressedOutput.putLong(currentValue);
+    prevValue = currentValue;
+
+    // Decompress remaining values
+    for (int i = 2; i < numLongs; i++) {
+      long deltaOfDelta = deltaBuffer.getLong();
+      long currentDelta = prevDelta + deltaOfDelta;
+      currentValue = prevValue + currentDelta;
+
+      decompressedOutput.putLong(currentValue);
+
+      prevValue = currentValue;
+      prevDelta = currentDelta;
+    }
+
+    // Make buffer ready for reading
+    decompressedOutput.flip();
+    return numLongs * Long.BYTES;
+  }
+
+  @Override
+  public int decompressedLength(ByteBuffer compressedInput) {
+    byte flag = compressedInput.get(compressedInput.position());
+    int numValues = compressedInput.getInt(compressedInput.position() + 1);
+    if (flag != LONG_FLAG) {
+      return numValues * Integer.BYTES;
+    }
+    return numValues * Long.BYTES;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
index b11a991d33a..0ef13d23e63 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
@@ -217,7 +217,15 @@ public abstract class BaseChunkForwardIndexReader 
implements ForwardIndexReader<
     decompressedBuffer.clear();
 
     try {
-      
_chunkDecompressor.decompress(_dataBuffer.toDirectByteBuffer(chunkPosition, 
chunkSize), decompressedBuffer);
+      if (_compressionType == ChunkCompressionType.DELTA || _compressionType 
== ChunkCompressionType.DELTADELTA) {
+        // For delta-based compression, pre-size the output using 
decompressor's length calculation.
+        ByteBuffer compressedBuffer = 
_dataBuffer.toDirectByteBuffer(chunkPosition, chunkSize);
+        int decompressedSize = 
_chunkDecompressor.decompressedLength(compressedBuffer);
+        decompressedBuffer = ByteBuffer.allocateDirect(decompressedSize);
+        _chunkDecompressor.decompress(compressedBuffer, decompressedBuffer);
+      } else {
+        
_chunkDecompressor.decompress(_dataBuffer.toDirectByteBuffer(chunkPosition, 
chunkSize), decompressedBuffer);
+      }
     } catch (IOException e) {
       LOGGER.error("Exception caught while decompressing data chunk", e);
       throw new RuntimeException(e);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/DeltaCompressionTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/DeltaCompressionTest.java
new file mode 100644
index 00000000000..b5bd8eb5ab8
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/DeltaCompressionTest.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class DeltaCompressionTest {
+
+  @Test
+  public void testRoundTripEmpty()
+      throws IOException {
+    long[] values = new long[]{};
+
+    ByteBuffer input = ByteBuffer.allocateDirect(values.length * Long.BYTES);
+    for (long v : values) {
+      input.putLong(v);
+    }
+    input.flip();
+    int numLongs = input.remaining() / Long.BYTES;
+    assertEquals(numLongs, values.length);
+
+    try (ChunkCompressor compressor = 
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTA)) {
+      ByteBuffer compressed = 
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+      assertEquals(input.limit(), 0);
+      int compressedSize = compressor.compress(input.slice(), compressed);
+      assertEquals(compressedSize, 5);
+
+      try (ChunkDecompressor decompressor = 
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTA)) {
+        int decompressedSize = decompressor.decompressedLength(compressed);
+        ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+        int actualSize = decompressor.decompress(compressed, decompressed);
+        assertEquals(actualSize, values.length * Long.BYTES);
+
+        decompressed.flip();
+        for (long expected : values) {
+          assertEquals(decompressed.getLong(), expected);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRoundTripSingleValue()
+      throws IOException {
+    long[] values = new long[]{10L};
+
+    ByteBuffer input = ByteBuffer.allocateDirect(values.length * Long.BYTES);
+
+    for (long v : values) {
+      input.putLong(v);
+    }
+    input.flip();
+    int numLongs = input.remaining() / Long.BYTES;
+    assertEquals(numLongs, values.length);
+
+    try (ChunkCompressor compressor = 
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTA)) {
+      ByteBuffer compressed = 
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+      assertEquals(input.limit(), 8);
+      int compressedSize = compressor.compress(input.slice(), compressed);
+      assertEquals(compressedSize, 13);
+
+      try (ChunkDecompressor decompressor = 
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTA)) {
+        int decompressedSize = decompressor.decompressedLength(compressed);
+        assertEquals(decompressedSize, 8);
+        ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+        int actualSize = decompressor.decompress(compressed, decompressed);
+        assertEquals(actualSize, values.length * Long.BYTES);
+
+        for (long expected : values) {
+          assertEquals(decompressed.getLong(), expected);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRoundTripMultiValues()
+      throws IOException {
+    long[] values = new long[]{10L, 12L, 15L, 21L, 30L, 30L, 31L, 
Long.MIN_VALUE + 10L, Long.MAX_VALUE - 10L};
+
+    ByteBuffer input = ByteBuffer.allocateDirect(values.length * Long.BYTES);
+
+    for (long v : values) {
+      input.putLong(v);
+    }
+    input.flip();
+    int numLongs = input.remaining() / Long.BYTES;
+    assertEquals(numLongs, values.length);
+
+    try (ChunkCompressor compressor = 
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTA)) {
+      ByteBuffer compressed = 
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+      int compressedSize = compressor.compress(input.slice(), compressed);
+
+      try (ChunkDecompressor decompressor = 
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTA)) {
+        int decompressedSize = decompressor.decompressedLength(compressed);
+        ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+        int actualSize = decompressor.decompress(compressed, decompressed);
+        assertEquals(actualSize, values.length * Long.BYTES);
+
+        for (long expected : values) {
+          assertEquals(decompressed.getLong(), expected);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRoundTripEmptyInt()
+      throws IOException {
+    int[] values = new int[]{};
+
+    ByteBuffer input = ByteBuffer.allocateDirect(values.length * 
Integer.BYTES);
+    for (int v : values) {
+      input.putInt(v);
+    }
+    input.flip();
+    int numIntegers = input.remaining() / Integer.BYTES;
+    assertEquals(numIntegers, values.length);
+
+    try (ChunkCompressor compressor = 
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTA)) {
+      ByteBuffer compressed = 
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+      assertEquals(input.limit(), 0);
+      int compressedSize = compressor.compress(input.slice(), compressed);
+      assertEquals(compressedSize, 5);
+
+      try (ChunkDecompressor decompressor = 
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTA)) {
+        int decompressedSize = decompressor.decompressedLength(compressed);
+        ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+        int actualSize = decompressor.decompress(compressed, decompressed);
+        assertEquals(actualSize, values.length * Integer.BYTES);
+
+        decompressed.flip();
+        for (int expected : values) {
+          assertEquals(decompressed.getInt(), expected);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRoundTripSingleValueInt()
+      throws IOException {
+    int[] values = new int[]{10};
+
+    ByteBuffer input = ByteBuffer.allocateDirect(values.length * 
Integer.BYTES);
+
+    for (int v : values) {
+      input.putInt(v);
+    }
+    input.flip();
+    int numIntegers = input.remaining() / Integer.BYTES;
+    assertEquals(numIntegers, values.length);
+
+    try (ChunkCompressor compressor = 
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTA)) {
+      ByteBuffer compressed = 
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+      assertEquals(input.limit(), 4);
+      int compressedSize = compressor.compress(input.slice(), compressed);
+      assertEquals(compressedSize, 9);
+
+      try (ChunkDecompressor decompressor = 
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTA)) {
+        int decompressedSize = decompressor.decompressedLength(compressed);
+        assertEquals(decompressedSize, 4);
+        ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+        int actualSize = decompressor.decompress(compressed, decompressed);
+        assertEquals(actualSize, values.length * Integer.BYTES);
+
+        for (int expected : values) {
+          assertEquals(decompressed.getInt(), expected);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRoundTripMultiValuesInt()
+      throws IOException {
+    int[] values = new int[]{10, 12, 15, 21, 30, 30, 31, Integer.MIN_VALUE + 
10, Integer.MAX_VALUE - 10};
+
+    ByteBuffer input = ByteBuffer.allocateDirect(values.length * 
Integer.BYTES);
+
+    for (int v : values) {
+      input.putInt(v);
+    }
+    input.flip();
+    int numIntegers = input.remaining() / Integer.BYTES;
+    assertEquals(numIntegers, values.length);
+
+    try (ChunkCompressor compressor = 
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTA)) {
+      ByteBuffer compressed = 
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+      compressor.compress(input.slice(), compressed);
+
+      try (ChunkDecompressor decompressor = 
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTA)) {
+        int decompressedSize = decompressor.decompressedLength(compressed);
+        ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+        int actualSize = decompressor.decompress(compressed, decompressed);
+        assertEquals(actualSize, values.length * Integer.BYTES);
+
+        for (int expected : values) {
+          assertEquals(decompressed.getInt(), expected);
+        }
+      }
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaCompressionTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaCompressionTest.java
new file mode 100644
index 00000000000..5411d561964
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaCompressionTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class DeltaDeltaCompressionTest {
+
+  @Test
+  public void testRoundTripEmpty()
+      throws IOException {
+    long[] values = new long[]{};
+
+    ByteBuffer input = ByteBuffer.allocateDirect(values.length * Long.BYTES);
+    for (long v : values) {
+      input.putLong(v);
+    }
+    input.flip();
+
+    try (ChunkCompressor compressor = 
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTADELTA)) {
+      ByteBuffer compressed = 
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+      compressor.compress(input.slice(), compressed);
+
+      try (ChunkDecompressor decompressor = 
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTADELTA)) {
+        int decompressedSize = decompressor.decompressedLength(compressed);
+        ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+        int actualSize = decompressor.decompress(compressed, decompressed);
+        assertEquals(actualSize, values.length * Long.BYTES);
+
+        for (long expected : values) {
+          assertEquals(decompressed.getLong(), expected);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRoundTripSingleValue()
+      throws IOException {
+    long[] values = new long[]{100L};
+
+    ByteBuffer input = ByteBuffer.allocateDirect(values.length * Long.BYTES);
+    for (long v : values) {
+      input.putLong(v);
+    }
+    input.flip();
+
+    try (ChunkCompressor compressor = 
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTADELTA)) {
+      ByteBuffer compressed = 
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+      compressor.compress(input.slice(), compressed);
+
+      try (ChunkDecompressor decompressor = 
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTADELTA)) {
+        int decompressedSize = decompressor.decompressedLength(compressed);
+        ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+        int actualSize = decompressor.decompress(compressed, decompressed);
+        assertEquals(actualSize, values.length * Long.BYTES);
+
+        for (long expected : values) {
+          assertEquals(decompressed.getLong(), expected);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRoundTripMultiValues()
+      throws IOException {
+    long[] values = new long[]{100L, 105L, 111L, 118L, 126L, 135L, 145L, 156L, 
168L};
+
+    ByteBuffer input = ByteBuffer.allocateDirect(values.length * Long.BYTES);
+    for (long v : values) {
+      input.putLong(v);
+    }
+    input.flip();
+
+    try (ChunkCompressor compressor = 
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTADELTA)) {
+      ByteBuffer compressed = 
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+      compressor.compress(input.slice(), compressed);
+
+      try (ChunkDecompressor decompressor = 
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTADELTA)) {
+        int decompressedSize = decompressor.decompressedLength(compressed);
+        ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+        int actualSize = decompressor.decompress(compressed, decompressed);
+        assertEquals(actualSize, values.length * Long.BYTES);
+
+        for (long expected : values) {
+          assertEquals(decompressed.getLong(), expected);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRoundTripEmptyInt()
+      throws IOException {
+    int[] values = new int[]{};
+
+    ByteBuffer input = ByteBuffer.allocateDirect(values.length * 
Integer.BYTES);
+    for (int v : values) {
+      input.putInt(v);
+    }
+    input.flip();
+
+    try (ChunkCompressor compressor = 
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTADELTA)) {
+      ByteBuffer compressed = 
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+      compressor.compress(input.slice(), compressed);
+
+      try (ChunkDecompressor decompressor = 
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTADELTA)) {
+        int decompressedSize = decompressor.decompressedLength(compressed);
+        ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+        int actualSize = decompressor.decompress(compressed, decompressed);
+        assertEquals(actualSize, values.length * Integer.BYTES);
+
+        for (int expected : values) {
+          assertEquals(decompressed.getInt(), expected);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRoundTripSingleValueInt()
+      throws IOException {
+    int[] values = new int[]{100};
+
+    ByteBuffer input = ByteBuffer.allocateDirect(values.length * 
Integer.BYTES);
+    for (int v : values) {
+      input.putInt(v);
+    }
+    input.flip();
+
+    try (ChunkCompressor compressor = 
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTADELTA)) {
+      ByteBuffer compressed = 
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+      compressor.compress(input.slice(), compressed);
+
+      try (ChunkDecompressor decompressor = 
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTADELTA)) {
+        int decompressedSize = decompressor.decompressedLength(compressed);
+        ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+        int actualSize = decompressor.decompress(compressed, decompressed);
+        assertEquals(actualSize, values.length * Integer.BYTES);
+
+        for (int expected : values) {
+          assertEquals(decompressed.getInt(), expected);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRoundTripMultiValuesInt()
+      throws IOException {
+    int[] values = new int[]{100, 105, 111, 118, 126, 135, 145, 156, 168};
+
+    ByteBuffer input = ByteBuffer.allocateDirect(values.length * 
Integer.BYTES);
+    for (int v : values) {
+      input.putInt(v);
+    }
+    input.flip();
+
+    try (ChunkCompressor compressor = 
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTADELTA)) {
+      ByteBuffer compressed = 
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+      compressor.compress(input.slice(), compressed);
+
+      try (ChunkDecompressor decompressor = 
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTADELTA)) {
+        int decompressedSize = decompressor.decompressedLength(compressed);
+        ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+        int actualSize = decompressor.decompress(compressed, decompressed);
+        assertEquals(actualSize, values.length * Integer.BYTES);
+
+        for (int expected : values) {
+          assertEquals(decompressed.getInt(), expected);
+        }
+      }
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
index b2385edc270..b238bf1917f 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
@@ -67,7 +67,9 @@ public class VarByteChunkSVForwardIndexWriterTest implements 
PinotBuffersAfterMe
     int[] numbersOfDocs = {10, 1000};
     int[][] entryLengths = {{1, 1}, {0, 10}, {0, 100}, {100, 100}, {900, 
1000}};
     int[] versions = {2, 3};
-    return 
Arrays.stream(ChunkCompressionType.values()).flatMap(chunkCompressionType -> 
IntStream.of(versions).boxed()
+    return Arrays.stream(ChunkCompressionType.values())
+        .filter(t -> t != ChunkCompressionType.DELTA && t != 
ChunkCompressionType.DELTADELTA)
+        .flatMap(chunkCompressionType -> IntStream.of(versions).boxed()
         .flatMap(version -> IntStream.of(numbersOfDocs).boxed().flatMap(
             totalDocs -> IntStream.of(numDocsPerChunks).boxed()
                 .flatMap(numDocsPerChunk -> 
Arrays.stream(entryLengths).map(lengths -> new Object[]{
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
index 6e1b9d6e819..6c958c1a014 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
@@ -59,6 +59,7 @@ public class MultiValueFixedByteRawIndexCreatorTest 
implements PinotBuffersAfter
   @DataProvider(name = "compressionTypes")
   public Object[][] compressionTypes() {
     return Arrays.stream(ChunkCompressionType.values())
+        .filter(t -> t != ChunkCompressionType.DELTA && t != 
ChunkCompressionType.DELTADELTA)
         .flatMap(ct -> IntStream.rangeClosed(2, 5).boxed().map(writerVersion 
-> new Object[]{ct, writerVersion}))
         .toArray(Object[][]::new);
   }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
index 96b973cdab2..2e14bcd3193 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -61,6 +61,7 @@ public class MultiValueVarByteRawIndexCreatorTest implements 
PinotBuffersAfterMe
   @DataProvider
   public Object[][] params() {
     return Arrays.stream(ChunkCompressionType.values())
+        .filter(t -> t != ChunkCompressionType.DELTA && t != 
ChunkCompressionType.DELTADELTA)
         .flatMap(chunkCompressionType -> IntStream.rangeClosed(2, 5)
             .boxed()
             .flatMap(writerVersion -> IntStream.of(10, 100)
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
index 3587345435e..2a2f3d589f1 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
@@ -56,6 +56,7 @@ public class FixedByteChunkSVForwardIndexTest implements 
PinotBuffersAfterMethod
   @DataProvider(name = "combinations")
   public static Object[][] combinations() {
     return Arrays.stream(ChunkCompressionType.values())
+        .filter(t -> t != ChunkCompressionType.DELTA && t != 
ChunkCompressionType.DELTADELTA)
         .flatMap(chunkCompressionType -> IntStream.of(2, 3, 4)
             .mapToObj(version -> new Object[]{chunkCompressionType, version}))
         .toArray(Object[][]::new);
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
index 79c678c2609..d7bb4086719 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.segment.spi.compression;
 
 public enum ChunkCompressionType {
-  PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3), LZ4_LENGTH_PREFIXED(4), 
GZIP(5);
+  PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3), LZ4_LENGTH_PREFIXED(4), 
GZIP(5), DELTA(6), DELTADELTA(7);
 
   private static final ChunkCompressionType[] VALUES = values();
 
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
index c34f0661b32..a4324a40711 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
@@ -132,6 +132,14 @@ public class ForwardIndexConfig extends IndexConfig {
           _chunkCompressionType = ChunkCompressionType.GZIP;
           _dictIdCompressionType = null;
           break;
+        case DELTA:
+          _chunkCompressionType = ChunkCompressionType.DELTA;
+          _dictIdCompressionType = null;
+          break;
+        case DELTADELTA:
+          _chunkCompressionType = ChunkCompressionType.DELTADELTA;
+          _dictIdCompressionType = null;
+          break;
         case MV_ENTRY_DICT:
           _dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT;
           _chunkCompressionType = null;
@@ -177,6 +185,10 @@ public class ForwardIndexConfig extends IndexConfig {
           return CompressionCodec.ZSTANDARD;
         case LZ4:
           return CompressionCodec.LZ4;
+        case DELTA:
+          return CompressionCodec.DELTA;
+        case DELTADELTA:
+          return CompressionCodec.DELTADELTA;
         default:
           throw new IllegalStateException("Unsupported chunk compression type: 
" + chunkCompressionType);
       }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
index 130c1db257c..e4f76c18445 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
@@ -150,7 +150,10 @@ public class FieldConfig extends BaseJsonConfig {
     CLP(false, false),
     CLPV2(false, false),
     CLPV2_ZSTD(false, false),
-    CLPV2_LZ4(false, false);
+    CLPV2_LZ4(false, false),
+
+    DELTA(false, false),
+    DELTADELTA(false, false);
 
     //@formatter:on
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to