This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 687770ee3e1 [Timeseries-Storage] add Delta and DeltaDelta storage
encoding (#15258)
687770ee3e1 is described below
commit 687770ee3e1b1042e954bc8f7b3e9b53997ab64c
Author: Qiaochu Liu <[email protected]>
AuthorDate: Thu Nov 20 11:21:18 2025 -0800
[Timeseries-Storage] add Delta and DeltaDelta storage encoding (#15258)
* [Timeseries-Storage] add Delta and DeltaDelta storage encoding
* Add support for Int
* fix tests
* fix tests
---
.../io/compression/ChunkCompressorFactory.java | 14 +-
.../local/io/compression/DeltaCompressor.java | 199 ++++++++++++++++++
.../local/io/compression/DeltaDecompressor.java | 151 ++++++++++++++
.../local/io/compression/DeltaDeltaCompressor.java | 231 +++++++++++++++++++++
.../io/compression/DeltaDeltaDecompressor.java | 169 +++++++++++++++
.../forward/BaseChunkForwardIndexReader.java | 10 +-
.../local/io/compression/DeltaCompressionTest.java | 228 ++++++++++++++++++++
.../io/compression/DeltaDeltaCompressionTest.java | 200 ++++++++++++++++++
.../impl/VarByteChunkSVForwardIndexWriterTest.java | 4 +-
.../MultiValueFixedByteRawIndexCreatorTest.java | 1 +
.../MultiValueVarByteRawIndexCreatorTest.java | 1 +
.../forward/FixedByteChunkSVForwardIndexTest.java | 1 +
.../spi/compression/ChunkCompressionType.java | 2 +-
.../segment/spi/index/ForwardIndexConfig.java | 12 ++
.../apache/pinot/spi/config/table/FieldConfig.java | 5 +-
15 files changed, 1223 insertions(+), 5 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
index 15def2f733b..33ef239583a 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
@@ -71,6 +71,12 @@ public class ChunkCompressorFactory {
case GZIP:
return new GzipCompressor();
+ case DELTA:
+ return DeltaCompressor.INSTANCE;
+
+ case DELTADELTA:
+ return DeltaDeltaCompressor.INSTANCE;
+
default:
throw new IllegalArgumentException("Illegal compressor name " +
compressionType);
}
@@ -102,8 +108,14 @@ public class ChunkCompressorFactory {
case GZIP:
return new GzipDecompressor();
+ case DELTA:
+ return DeltaDecompressor.INSTANCE;
+
+ case DELTADELTA:
+ return DeltaDeltaDecompressor.INSTANCE;
+
default:
- throw new IllegalArgumentException("Illegal compressor name " +
compressionType);
+ throw new IllegalArgumentException("Illegal decompressor name " +
compressionType);
}
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaCompressor.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaCompressor.java
new file mode 100644
index 00000000000..03b8b8fcf50
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaCompressor.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+
+
+/**
+ * Implementation of {@link ChunkCompressor} using delta compression with LZ4.
+ * The delta values are further compressed using LZ4.
+ */
+class DeltaCompressor implements ChunkCompressor {
+
+ static final DeltaCompressor INSTANCE = new DeltaCompressor();
+ static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance();
+ private static final byte INT_FLAG = 0;
+ private static final byte LONG_FLAG = 1;
+
+ private DeltaCompressor() {
+ }
+
+ @Override
+ public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
+ throws IOException {
+ // Store original position to calculate compressed size
+ int outStartPosition = outCompressed.position();
+
+ int remaining = inUncompressed.remaining();
+ if (remaining % Integer.BYTES != 0) {
+ throw new IOException("Invalid input size: must be multiple of 4 bytes
for INT or 8 bytes for LONG");
+ }
+ if (remaining % Long.BYTES != 0) {
+ outCompressed.put(INT_FLAG);
+ return compressForInt(inUncompressed, outCompressed, outStartPosition);
+ }
+ outCompressed.put(LONG_FLAG);
+ return compressForLong(inUncompressed, outCompressed, outStartPosition);
+ }
+
+ private int compressForInt(ByteBuffer inUncompressed, ByteBuffer
outCompressed, int outStartPosition)
+ throws IOException {
+ // Count the number of long values to be compressed.
+ int numIntegers = inUncompressed.remaining() / Integer.BYTES;
+ if (numIntegers == 0) {
+ outCompressed.putInt(0);
+ outCompressed.flip();
+ return 5; // 1 byte flag + 4 bytes for numIntegers
+ }
+
+ // Store the number of integer values at the start
+ outCompressed.putInt(numIntegers);
+
+ // Store the first value as-is
+ int prevValue = inUncompressed.getInt();
+ outCompressed.putInt(prevValue);
+
+ if (numIntegers == 1) {
+ outCompressed.flip();
+ return outCompressed.limit() - outStartPosition;
+ }
+
+ // Create temporary buffer for delta values before LZ4 compression
+ ByteBuffer deltaBuffer = ByteBuffer.allocate((numIntegers - 1) *
Integer.BYTES);
+
+ // Calculate deltas
+ for (int i = 1; i < numIntegers; i++) {
+ int currentValue = inUncompressed.getInt();
+ int delta = currentValue - prevValue;
+ deltaBuffer.putInt(delta);
+ prevValue = currentValue;
+ }
+
+ // Prepare the delta buffer for reading
+ deltaBuffer.flip();
+
+ // Reserve space for compressed size
+ outCompressed.position(outCompressed.position() + Integer.BYTES);
+ int compressedStart = outCompressed.position();
+
+ // Compress delta values using LZ4
+ LZ4_FACTORY.fastCompressor().compress(deltaBuffer, outCompressed);
+
+ // Record compressed size
+ int compressedSize = outCompressed.position() - compressedStart;
+ outCompressed.putInt(compressedStart - Integer.BYTES, compressedSize);
+
+ // Make buffer ready for reading
+ outCompressed.flip();
+ return outCompressed.limit() - outStartPosition;
+ }
+
+ private int compressForLong(ByteBuffer inUncompressed, ByteBuffer
outCompressed, int outStartPosition)
+ throws IOException {
+ // Count the number of long values to be compressed.
+ int numLongs = inUncompressed.remaining() / Long.BYTES;
+ if (numLongs == 0) {
+ outCompressed.putInt(0);
+ outCompressed.flip();
+ return 5; // 1 byte flag + 4 bytes for numLongs
+ }
+
+ // Store the number of long values at the start
+ outCompressed.putInt(numLongs);
+
+ // Store the first value as-is
+ long prevValue = inUncompressed.getLong();
+ outCompressed.putLong(prevValue);
+
+ if (numLongs == 1) {
+ outCompressed.flip();
+ return outCompressed.limit() - outStartPosition;
+ }
+
+ // Create temporary buffer for delta values before LZ4 compression
+ ByteBuffer deltaBuffer = ByteBuffer.allocate((numLongs - 1) * Long.BYTES);
+
+ // Calculate deltas
+ for (int i = 1; i < numLongs; i++) {
+ long currentValue = inUncompressed.getLong();
+ long delta = currentValue - prevValue;
+ deltaBuffer.putLong(delta);
+ prevValue = currentValue;
+ }
+
+ // Prepare the delta buffer for reading
+ deltaBuffer.flip();
+
+ // Reserve space for compressed size
+ outCompressed.position(outCompressed.position() + Integer.BYTES);
+ int compressedStart = outCompressed.position();
+
+ // Compress delta values using LZ4
+ LZ4_FACTORY.fastCompressor().compress(deltaBuffer, outCompressed);
+
+ // Record compressed size
+ int compressedSize = outCompressed.position() - compressedStart;
+ outCompressed.putInt(compressedStart - Integer.BYTES, compressedSize);
+
+ // Make buffer ready for reading
+ outCompressed.flip();
+ return outCompressed.limit() - outStartPosition;
+ }
+
+ @Override
+ public int maxCompressedSize(int uncompressedSize) {
+ // Add 1 byte for type flag
+ int baseSize = 1;
+ if (uncompressedSize % Integer.BYTES != 0) {
+ throw new IllegalArgumentException("Invalid input size: must be multiple
of 4 bytes for INT or 8 bytes for LONG");
+ }
+ if (uncompressedSize % Long.BYTES != 0) {
+ int numIntegers = uncompressedSize / Integer.BYTES;
+ if (numIntegers == 0) {
+ return baseSize + 4; // flag + numIntegers
+ }
+ if (numIntegers == 1) {
+ return baseSize + 8; // flag + numIntegers + one int value
+ }
+ int deltaSize = (numIntegers - 1) * Long.BYTES;
+ return baseSize + 12 + LZ4_FACTORY.fastCompressor()
+ .maxCompressedLength(deltaSize); // flag + numIntegers + first int
value + compressed size + compressed data
+ }
+ int numLongs = uncompressedSize / Long.BYTES;
+ if (numLongs == 0) {
+ return baseSize + 4; // flag + numLongs
+ }
+ if (numLongs == 1) {
+ return baseSize + 12; // flag + numLongs + one long value
+ }
+ int deltaSize = (numLongs - 1) * Long.BYTES;
+ return baseSize + 16 + LZ4_FACTORY.fastCompressor()
+ .maxCompressedLength(deltaSize); // flag + numLongs + first value +
compressed size + compressed data
+ }
+
+ @Override
+ public ChunkCompressionType compressionType() {
+ return ChunkCompressionType.DELTA;
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDecompressor.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDecompressor.java
new file mode 100644
index 00000000000..bc873e83320
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDecompressor.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+
+
+/**
+ * Implementation of {@link ChunkDecompressor} for delta compression with LZ4.
+ * This decompressor reconstructs the original integer sequence from LZ4
compressed delta encoded values.
+ */
+class DeltaDecompressor implements ChunkDecompressor {
+
+ static final DeltaDecompressor INSTANCE = new DeltaDecompressor();
+ private static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance();
+ private static final byte LONG_FLAG = 1;
+
+ private DeltaDecompressor() {
+ }
+
+ @Override
+ public int decompress(ByteBuffer compressedInput, ByteBuffer
decompressedOutput)
+ throws IOException {
+
+ // Read and validate type flag (only LONG supported), TODO: support INT
+ byte flag = compressedInput.get();
+ if (flag != LONG_FLAG) {
+ return decompressForInt(compressedInput, decompressedOutput);
+ }
+ return decompressForLong(compressedInput, decompressedOutput);
+ }
+
+ private int decompressForInt(ByteBuffer compressedInput, ByteBuffer
decompressedOutput)
+ throws IOException {
+ // Get number of longs
+ int numIntegers = compressedInput.getInt();
+ if (numIntegers == 0) {
+ decompressedOutput.flip();
+ return 0;
+ }
+
+ // Get first value
+ int prevValue = compressedInput.getInt();
+ decompressedOutput.putInt(prevValue);
+
+ if (numIntegers == 1) {
+ decompressedOutput.flip();
+ return Integer.BYTES;
+ }
+
+ // Get size of compressed delta values
+ int compressedSize = compressedInput.getInt();
+
+ // Create temporary buffer for decompressed deltas
+ ByteBuffer deltaBuffer = ByteBuffer.allocate((numIntegers - 1) *
Integer.BYTES);
+
+ // Get compressed delta values position
+ ByteBuffer compressedDeltas = compressedInput.slice();
+ compressedDeltas.limit(compressedSize);
+
+ // Decompress delta values using LZ4
+ LZ4_FACTORY.safeDecompressor().decompress(compressedDeltas, deltaBuffer);
+ deltaBuffer.flip();
+
+ // Reconstruct remaining values
+ for (int i = 1; i < numIntegers; i++) {
+ int delta = deltaBuffer.getInt();
+ int currentValue = prevValue + delta;
+ decompressedOutput.putInt(currentValue);
+ prevValue = currentValue;
+ }
+
+ // Make buffer ready for reading
+ decompressedOutput.flip();
+ return numIntegers * Integer.BYTES;
+ }
+
+ private int decompressForLong(ByteBuffer compressedInput, ByteBuffer
decompressedOutput)
+ throws IOException {
+ // Get number of longs
+ int numLongs = compressedInput.getInt();
+ if (numLongs == 0) {
+ decompressedOutput.flip();
+ return 0;
+ }
+
+ // Get first value
+ long prevValue = compressedInput.getLong();
+ decompressedOutput.putLong(prevValue);
+
+ if (numLongs == 1) {
+ decompressedOutput.flip();
+ return Long.BYTES;
+ }
+
+ // Get size of compressed delta values
+ int compressedSize = compressedInput.getInt();
+
+ // Create temporary buffer for decompressed deltas
+ ByteBuffer deltaBuffer = ByteBuffer.allocate((numLongs - 1) * Long.BYTES);
+
+ // Get compressed delta values position
+ ByteBuffer compressedDeltas = compressedInput.slice();
+ compressedDeltas.limit(compressedSize);
+
+ // Decompress delta values using LZ4
+ LZ4_FACTORY.safeDecompressor().decompress(compressedDeltas, deltaBuffer);
+ deltaBuffer.flip();
+
+ // Reconstruct remaining values
+ for (int i = 1; i < numLongs; i++) {
+ long delta = deltaBuffer.getLong();
+ long currentValue = prevValue + delta;
+ decompressedOutput.putLong(currentValue);
+ prevValue = currentValue;
+ }
+
+ // Make buffer ready for reading
+ decompressedOutput.flip();
+ return numLongs * Long.BYTES;
+ }
+
+ @Override
+ public int decompressedLength(ByteBuffer compressedInput) {
+ byte flag = compressedInput.get(compressedInput.position());
+ int numValues = compressedInput.getInt(compressedInput.position() + 1);
+ if (flag != LONG_FLAG) {
+ return numValues * Integer.BYTES;
+ }
+ return numValues * Long.BYTES;
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaCompressor.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaCompressor.java
new file mode 100644
index 00000000000..394772ac883
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaCompressor.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+
+
+/**
+ * A Delta of delta implementation of {@link ChunkCompressor}, that simply
returns the input uncompressed data
+ * with performing delta of delta encoding. This is useful in cases where cost
of de-compression out-weighs benefit of
+ * compression.
+ */
+class DeltaDeltaCompressor implements ChunkCompressor {
+
+ static final DeltaDeltaCompressor INSTANCE = new DeltaDeltaCompressor();
+ static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance();
+ private static final byte INT_FLAG = 0;
+ private static final byte LONG_FLAG = 1;
+
+ private DeltaDeltaCompressor() {
+ }
+
+ /**
+ * The compression works by:
+ * (1) Storing the first value as-is
+ * (2) Computing and storing the first delta (difference between second and
first value)
+ * (3) For all subsequent values, storing the difference between consecutive
deltas (delta of delta)
+ * (4) During decompression, the process is reversed to reconstruct the
original values
+ *
+ * The following scenarios data will be benefited from delta of delta
compression
+ * The data is sorted
+ * The differences between consecutive values are relatively constant
+ * The data consists of integers (it's specifically designed for integer
sequences)
+ * */
+ @Override
+ public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
+ throws IOException {
+ // Store original position to calculate compressed size
+ int outStartPosition = outCompressed.position();
+
+ int remaining = inUncompressed.remaining();
+ if (remaining % Integer.BYTES != 0) {
+ throw new IOException("Invalid input size: must be multiple of 4 bytes
for INT or 8 bytes for LONG");
+ }
+ if (remaining % Long.BYTES != 0) {
+ return compressForInt(inUncompressed, outCompressed, outStartPosition);
+ }
+ return compressForLong(inUncompressed, outCompressed, outStartPosition);
+ }
+
+ private int compressForInt(ByteBuffer inUncompressed, ByteBuffer
outCompressed, int startPosition)
+ throws IOException {
+ outCompressed.put(INT_FLAG);
+ // Get number of integers to compress
+ int numIntegers = inUncompressed.remaining() / Integer.BYTES;
+ if (numIntegers == 0) {
+ outCompressed.putInt(0);
+ outCompressed.flip();
+ return 5; // 1 byte flag + 4 bytes for numIntegers
+ }
+
+ // Store number of Integers at the start
+ outCompressed.putInt(numIntegers);
+
+ // Store first value as-is
+ int prevValue = inUncompressed.getInt();
+ outCompressed.putInt(prevValue);
+
+ if (numIntegers == 1) {
+ outCompressed.flip();
+ return outCompressed.limit() - startPosition;
+ }
+
+ // Create temporary buffer for delta values before LZ4 compression
+ ByteBuffer deltaBuffer = ByteBuffer.allocate((numIntegers - 1) *
Integer.BYTES);
+
+ // Store first delta
+ int prevDelta = inUncompressed.getInt() - prevValue;
+ deltaBuffer.putInt(prevDelta);
+ prevValue += prevDelta;
+
+ // Calculate remaining deltas
+ for (int i = 2; i < numIntegers; i++) {
+ int currentValue = inUncompressed.getInt();
+ int currentDelta = currentValue - prevValue;
+ int deltaOfDelta = currentDelta - prevDelta;
+
+ deltaBuffer.putInt(deltaOfDelta);
+
+ prevValue = currentValue;
+ prevDelta = currentDelta;
+ }
+
+ // Prepare delta buffer for reading
+ deltaBuffer.flip();
+
+ // Reserve space for compressed size
+ outCompressed.position(outCompressed.position() + Integer.BYTES);
+ int compressedStart = outCompressed.position();
+
+ // Compress delta values using LZ4
+ LZ4_FACTORY.fastCompressor().compress(deltaBuffer, outCompressed);
+
+ // Record compressed size
+ int compressedSize = outCompressed.position() - compressedStart;
+ outCompressed.putInt(compressedStart - Integer.BYTES, compressedSize);
+
+ // Make buffer ready for reading
+ outCompressed.flip();
+ return outCompressed.limit() - startPosition;
+ }
+
+ private int compressForLong(ByteBuffer inUncompressed, ByteBuffer
outCompressed, int startPosition)
+ throws IOException {
+ outCompressed.put(LONG_FLAG);
+ // Get number of longs to compress
+ int numLongs = inUncompressed.remaining() / Long.BYTES;
+ if (numLongs == 0) {
+ outCompressed.putInt(0);
+ outCompressed.flip();
+ return 5; // 1 byte flag + 4 bytes for numLongs
+ }
+
+ // Store number of longs at the start
+ outCompressed.putInt(numLongs);
+
+ // Store first value as-is
+ long prevValue = inUncompressed.getLong();
+ outCompressed.putLong(prevValue);
+
+ if (numLongs == 1) {
+ outCompressed.flip();
+ return outCompressed.limit() - startPosition;
+ }
+
+ // Create temporary buffer for delta values before LZ4 compression
+ ByteBuffer deltaBuffer = ByteBuffer.allocate((numLongs - 1) * Long.BYTES);
+
+ // Store first delta
+ long prevDelta = inUncompressed.getLong() - prevValue;
+ deltaBuffer.putLong(prevDelta);
+ prevValue += prevDelta;
+
+ // Calculate remaining deltas
+ for (int i = 2; i < numLongs; i++) {
+ long currentValue = inUncompressed.getLong();
+ long currentDelta = currentValue - prevValue;
+ long deltaOfDelta = currentDelta - prevDelta;
+
+ deltaBuffer.putLong(deltaOfDelta);
+
+ prevValue = currentValue;
+ prevDelta = currentDelta;
+ }
+
+ // Prepare delta buffer for reading
+ deltaBuffer.flip();
+
+ // Reserve space for compressed size
+ outCompressed.position(outCompressed.position() + Integer.BYTES);
+ int compressedStart = outCompressed.position();
+
+ // Compress delta values using LZ4
+ LZ4_FACTORY.fastCompressor().compress(deltaBuffer, outCompressed);
+
+ // Record compressed size
+ int compressedSize = outCompressed.position() - compressedStart;
+ outCompressed.putInt(compressedStart - Integer.BYTES, compressedSize);
+
+ // Make buffer ready for reading
+ outCompressed.flip();
+ return outCompressed.limit() - startPosition;
+ }
+
+ @Override
+ public int maxCompressedSize(int uncompressedSize) {
+ // Add 1 byte for int or long flag
+ int flagSize = 1;
+
+ if (uncompressedSize % Integer.BYTES != 0) {
+ throw new IllegalArgumentException("Invalid input size: must be multiple
of 4 bytes for INT or 8 bytes for LONG");
+ }
+ if (uncompressedSize % Long.BYTES != 0) {
+ int numIntegers = uncompressedSize / Integer.BYTES;
+ if (numIntegers == 0) {
+ return flagSize + 4; // flag + num of Integers
+ }
+ if (numIntegers == 1) {
+ return flagSize + 8; // flag + num of Integers + one integers value
+ }
+ int deltaSize = (numIntegers - 1) * Integer.BYTES;
+ // flag + num of Longs + first value + compressed size + compressed delta
+ return flagSize + 12 +
LZ4_FACTORY.fastCompressor().maxCompressedLength(deltaSize);
+ }
+ int numLongs = uncompressedSize / Long.BYTES;
+ if (numLongs == 0) {
+ return flagSize + 4; // flag + num of Longs
+ }
+ if (numLongs == 1) {
+ return flagSize + 12; // flag + num of Longs + one long value
+ }
+ int deltaSize = (numLongs - 1) * Long.BYTES;
+ // flag + num of Longs + first value + compressed size + compressed delta
+ return flagSize + 16 +
LZ4_FACTORY.fastCompressor().maxCompressedLength(deltaSize);
+ }
+
+ @Override
+ public ChunkCompressionType compressionType() {
+ return ChunkCompressionType.DELTADELTA;
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaDecompressor.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaDecompressor.java
new file mode 100644
index 00000000000..3016a8c6c77
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaDecompressor.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+
+
+/**
+ * A pass-through implementation of {@link ChunkDecompressor}, that simply
returns the input data without
+ * performing any de-compression. This is useful for cases where cost of
de-compression out-weighs the benefits
+ * of compression.
+ */
+class DeltaDeltaDecompressor implements ChunkDecompressor {
+
+ static final DeltaDeltaDecompressor INSTANCE = new DeltaDeltaDecompressor();
+ static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance();
+ private static final byte LONG_FLAG = 1;
+
+ private DeltaDeltaDecompressor() {
+ }
+
+ @Override
+ public int decompress(ByteBuffer compressedInput, ByteBuffer
decompressedOutput)
+ throws IOException {
+ // Read and validate type flag
+ byte flag = compressedInput.get();
+ if (flag != LONG_FLAG) {
+ return decompressForInt(compressedInput, decompressedOutput);
+ }
+ return decompressForLong(compressedInput, decompressedOutput);
+ }
+
+ private int decompressForInt(ByteBuffer compressedInput, ByteBuffer
decompressedOutput) {
+ // Get number of integers
+ int numIntegers = compressedInput.getInt();
+ if (numIntegers == 0) {
+ decompressedOutput.flip();
+ return 0;
+ }
+
+ // Get first value
+ int prevValue = compressedInput.getInt();
+ decompressedOutput.putInt(prevValue);
+
+ if (numIntegers == 1) {
+ decompressedOutput.flip();
+ return Integer.BYTES;
+ }
+
+ // Get size of compressed delta values
+ int compressedSize = compressedInput.getInt();
+
+ // Create temporary buffer for decompressed deltas
+ ByteBuffer deltaBuffer = ByteBuffer.allocate((numIntegers - 1) *
Integer.BYTES);
+
+ // Get compressed delta values position
+ ByteBuffer compressedDeltas = compressedInput.slice();
+ compressedDeltas.limit(compressedSize);
+
+ // Decompress delta values using LZ4
+ LZ4_FACTORY.safeDecompressor().decompress(compressedDeltas, deltaBuffer);
+ deltaBuffer.flip();
+
+ // Get first delta
+ int prevDelta = deltaBuffer.getInt();
+ int currentValue = prevValue + prevDelta;
+ decompressedOutput.putInt(currentValue);
+ prevValue = currentValue;
+
+ // Decompress remaining values
+ for (int i = 2; i < numIntegers; i++) {
+ int deltaOfDelta = deltaBuffer.getInt();
+ int currentDelta = prevDelta + deltaOfDelta;
+ currentValue = prevValue + currentDelta;
+
+ decompressedOutput.putInt(currentValue);
+
+ prevValue = currentValue;
+ prevDelta = currentDelta;
+ }
+
+ // Make buffer ready for reading
+ decompressedOutput.flip();
+ return numIntegers * Integer.BYTES;
+ }
+
+ private int decompressForLong(ByteBuffer compressedInput, ByteBuffer
decompressedOutput) {
+ // Get number of integers
+ int numLongs = compressedInput.getInt();
+ if (numLongs == 0) {
+ decompressedOutput.flip();
+ return 0;
+ }
+
+ // Get first value
+ long prevValue = compressedInput.getLong();
+ decompressedOutput.putLong(prevValue);
+
+ if (numLongs == 1) {
+ decompressedOutput.flip();
+ return Long.BYTES;
+ }
+
+ // Get size of compressed delta values
+ int compressedSize = compressedInput.getInt();
+
+ // Create temporary buffer for decompressed deltas
+ ByteBuffer deltaBuffer = ByteBuffer.allocate((numLongs - 1) * Long.BYTES);
+
+ // Get compressed delta values position
+ ByteBuffer compressedDeltas = compressedInput.slice();
+ compressedDeltas.limit(compressedSize);
+
+ // Decompress delta values using LZ4
+ LZ4_FACTORY.safeDecompressor().decompress(compressedDeltas, deltaBuffer);
+ deltaBuffer.flip();
+
+ // Get first delta
+ long prevDelta = deltaBuffer.getLong();
+ long currentValue = prevValue + prevDelta;
+ decompressedOutput.putLong(currentValue);
+ prevValue = currentValue;
+
+ // Decompress remaining values
+ for (int i = 2; i < numLongs; i++) {
+ long deltaOfDelta = deltaBuffer.getLong();
+ long currentDelta = prevDelta + deltaOfDelta;
+ currentValue = prevValue + currentDelta;
+
+ decompressedOutput.putLong(currentValue);
+
+ prevValue = currentValue;
+ prevDelta = currentDelta;
+ }
+
+ // Make buffer ready for reading
+ decompressedOutput.flip();
+ return numLongs * Long.BYTES;
+ }
+
+ @Override
+ public int decompressedLength(ByteBuffer compressedInput) {
+ byte flag = compressedInput.get(compressedInput.position());
+ int numValues = compressedInput.getInt(compressedInput.position() + 1);
+ if (flag != LONG_FLAG) {
+ return numValues * Integer.BYTES;
+ }
+ return numValues * Long.BYTES;
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
index b11a991d33a..0ef13d23e63 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
@@ -217,7 +217,15 @@ public abstract class BaseChunkForwardIndexReader
implements ForwardIndexReader<
decompressedBuffer.clear();
try {
-
_chunkDecompressor.decompress(_dataBuffer.toDirectByteBuffer(chunkPosition,
chunkSize), decompressedBuffer);
+ if (_compressionType == ChunkCompressionType.DELTA || _compressionType
== ChunkCompressionType.DELTADELTA) {
+ // For delta-based compression, pre-size the output using
decompressor's length calculation.
+ ByteBuffer compressedBuffer =
_dataBuffer.toDirectByteBuffer(chunkPosition, chunkSize);
+ int decompressedSize =
_chunkDecompressor.decompressedLength(compressedBuffer);
+ decompressedBuffer = ByteBuffer.allocateDirect(decompressedSize);
+ _chunkDecompressor.decompress(compressedBuffer, decompressedBuffer);
+ } else {
+
_chunkDecompressor.decompress(_dataBuffer.toDirectByteBuffer(chunkPosition,
chunkSize), decompressedBuffer);
+ }
} catch (IOException e) {
LOGGER.error("Exception caught while decompressing data chunk", e);
throw new RuntimeException(e);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/DeltaCompressionTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/DeltaCompressionTest.java
new file mode 100644
index 00000000000..b5bd8eb5ab8
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/DeltaCompressionTest.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class DeltaCompressionTest {
+
+ @Test
+ public void testRoundTripEmpty()
+ throws IOException {
+ long[] values = new long[]{};
+
+ ByteBuffer input = ByteBuffer.allocateDirect(values.length * Long.BYTES);
+ for (long v : values) {
+ input.putLong(v);
+ }
+ input.flip();
+ int numLongs = input.remaining() / Long.BYTES;
+ assertEquals(numLongs, values.length);
+
+ try (ChunkCompressor compressor =
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTA)) {
+ ByteBuffer compressed =
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+ assertEquals(input.limit(), 0);
+ int compressedSize = compressor.compress(input.slice(), compressed);
+ assertEquals(compressedSize, 5);
+
+ try (ChunkDecompressor decompressor =
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTA)) {
+ int decompressedSize = decompressor.decompressedLength(compressed);
+ ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+ int actualSize = decompressor.decompress(compressed, decompressed);
+ assertEquals(actualSize, values.length * Long.BYTES);
+
+ decompressed.flip();
+ for (long expected : values) {
+ assertEquals(decompressed.getLong(), expected);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testRoundTripSingleValue()
+ throws IOException {
+ long[] values = new long[]{10L};
+
+ ByteBuffer input = ByteBuffer.allocateDirect(values.length * Long.BYTES);
+
+ for (long v : values) {
+ input.putLong(v);
+ }
+ input.flip();
+ int numLongs = input.remaining() / Long.BYTES;
+ assertEquals(numLongs, values.length);
+
+ try (ChunkCompressor compressor =
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTA)) {
+ ByteBuffer compressed =
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+ assertEquals(input.limit(), 8);
+ int compressedSize = compressor.compress(input.slice(), compressed);
+ assertEquals(compressedSize, 13);
+
+ try (ChunkDecompressor decompressor =
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTA)) {
+ int decompressedSize = decompressor.decompressedLength(compressed);
+ assertEquals(decompressedSize, 8);
+ ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+ int actualSize = decompressor.decompress(compressed, decompressed);
+ assertEquals(actualSize, values.length * Long.BYTES);
+
+ for (long expected : values) {
+ assertEquals(decompressed.getLong(), expected);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testRoundTripMultiValues()
+ throws IOException {
+ long[] values = new long[]{10L, 12L, 15L, 21L, 30L, 30L, 31L,
Long.MIN_VALUE + 10L, Long.MAX_VALUE - 10L};
+
+ ByteBuffer input = ByteBuffer.allocateDirect(values.length * Long.BYTES);
+
+ for (long v : values) {
+ input.putLong(v);
+ }
+ input.flip();
+ int numLongs = input.remaining() / Long.BYTES;
+ assertEquals(numLongs, values.length);
+
+ try (ChunkCompressor compressor =
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTA)) {
+ ByteBuffer compressed =
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+ int compressedSize = compressor.compress(input.slice(), compressed);
+
+ try (ChunkDecompressor decompressor =
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTA)) {
+ int decompressedSize = decompressor.decompressedLength(compressed);
+ ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+ int actualSize = decompressor.decompress(compressed, decompressed);
+ assertEquals(actualSize, values.length * Long.BYTES);
+
+ for (long expected : values) {
+ assertEquals(decompressed.getLong(), expected);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testRoundTripEmptyInt()
+ throws IOException {
+ int[] values = new int[]{};
+
+ ByteBuffer input = ByteBuffer.allocateDirect(values.length *
Integer.BYTES);
+ for (int v : values) {
+ input.putInt(v);
+ }
+ input.flip();
+ int numIntegers = input.remaining() / Integer.BYTES;
+ assertEquals(numIntegers, values.length);
+
+ try (ChunkCompressor compressor =
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTA)) {
+ ByteBuffer compressed =
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+ assertEquals(input.limit(), 0);
+ int compressedSize = compressor.compress(input.slice(), compressed);
+ assertEquals(compressedSize, 5);
+
+ try (ChunkDecompressor decompressor =
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTA)) {
+ int decompressedSize = decompressor.decompressedLength(compressed);
+ ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+ int actualSize = decompressor.decompress(compressed, decompressed);
+ assertEquals(actualSize, values.length * Integer.BYTES);
+
+ decompressed.flip();
+ for (int expected : values) {
+ assertEquals(decompressed.getInt(), expected);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testRoundTripSingleValueInt()
+ throws IOException {
+ int[] values = new int[]{10};
+
+ ByteBuffer input = ByteBuffer.allocateDirect(values.length *
Integer.BYTES);
+
+ for (int v : values) {
+ input.putInt(v);
+ }
+ input.flip();
+ int numIntegers = input.remaining() / Integer.BYTES;
+ assertEquals(numIntegers, values.length);
+
+ try (ChunkCompressor compressor =
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTA)) {
+ ByteBuffer compressed =
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+ assertEquals(input.limit(), 4);
+ int compressedSize = compressor.compress(input.slice(), compressed);
+ assertEquals(compressedSize, 9);
+
+ try (ChunkDecompressor decompressor =
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTA)) {
+ int decompressedSize = decompressor.decompressedLength(compressed);
+ assertEquals(decompressedSize, 4);
+ ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+ int actualSize = decompressor.decompress(compressed, decompressed);
+ assertEquals(actualSize, values.length * Integer.BYTES);
+
+ for (int expected : values) {
+ assertEquals(decompressed.getInt(), expected);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testRoundTripMultiValuesInt()
+ throws IOException {
+ int[] values = new int[]{10, 12, 15, 21, 30, 30, 31, Integer.MIN_VALUE +
10, Integer.MAX_VALUE - 10};
+
+ ByteBuffer input = ByteBuffer.allocateDirect(values.length *
Integer.BYTES);
+
+ for (int v : values) {
+ input.putInt(v);
+ }
+ input.flip();
+ int numIntegers = input.remaining() / Integer.BYTES;
+ assertEquals(numIntegers, values.length);
+
+ try (ChunkCompressor compressor =
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTA)) {
+ ByteBuffer compressed =
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+ compressor.compress(input.slice(), compressed);
+
+ try (ChunkDecompressor decompressor =
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTA)) {
+ int decompressedSize = decompressor.decompressedLength(compressed);
+ ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+ int actualSize = decompressor.decompress(compressed, decompressed);
+ assertEquals(actualSize, values.length * Integer.BYTES);
+
+ for (int expected : values) {
+ assertEquals(decompressed.getInt(), expected);
+ }
+ }
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaCompressionTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaCompressionTest.java
new file mode 100644
index 00000000000..5411d561964
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaCompressionTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class DeltaDeltaCompressionTest {
+
+ @Test
+ public void testRoundTripEmpty()
+ throws IOException {
+ long[] values = new long[]{};
+
+ ByteBuffer input = ByteBuffer.allocateDirect(values.length * Long.BYTES);
+ for (long v : values) {
+ input.putLong(v);
+ }
+ input.flip();
+
+ try (ChunkCompressor compressor =
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTADELTA)) {
+ ByteBuffer compressed =
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+ compressor.compress(input.slice(), compressed);
+
+ try (ChunkDecompressor decompressor =
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTADELTA)) {
+ int decompressedSize = decompressor.decompressedLength(compressed);
+ ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+ int actualSize = decompressor.decompress(compressed, decompressed);
+ assertEquals(actualSize, values.length * Long.BYTES);
+
+ for (long expected : values) {
+ assertEquals(decompressed.getLong(), expected);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testRoundTripSingleValue()
+ throws IOException {
+ long[] values = new long[]{100L};
+
+ ByteBuffer input = ByteBuffer.allocateDirect(values.length * Long.BYTES);
+ for (long v : values) {
+ input.putLong(v);
+ }
+ input.flip();
+
+ try (ChunkCompressor compressor =
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTADELTA)) {
+ ByteBuffer compressed =
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+ compressor.compress(input.slice(), compressed);
+
+ try (ChunkDecompressor decompressor =
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTADELTA)) {
+ int decompressedSize = decompressor.decompressedLength(compressed);
+ ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+ int actualSize = decompressor.decompress(compressed, decompressed);
+ assertEquals(actualSize, values.length * Long.BYTES);
+
+ for (long expected : values) {
+ assertEquals(decompressed.getLong(), expected);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testRoundTripMultiValues()
+ throws IOException {
+ long[] values = new long[]{100L, 105L, 111L, 118L, 126L, 135L, 145L, 156L,
168L};
+
+ ByteBuffer input = ByteBuffer.allocateDirect(values.length * Long.BYTES);
+ for (long v : values) {
+ input.putLong(v);
+ }
+ input.flip();
+
+ try (ChunkCompressor compressor =
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTADELTA)) {
+ ByteBuffer compressed =
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+ compressor.compress(input.slice(), compressed);
+
+ try (ChunkDecompressor decompressor =
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTADELTA)) {
+ int decompressedSize = decompressor.decompressedLength(compressed);
+ ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+ int actualSize = decompressor.decompress(compressed, decompressed);
+ assertEquals(actualSize, values.length * Long.BYTES);
+
+ for (long expected : values) {
+ assertEquals(decompressed.getLong(), expected);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testRoundTripEmptyInt()
+ throws IOException {
+ int[] values = new int[]{};
+
+ ByteBuffer input = ByteBuffer.allocateDirect(values.length *
Integer.BYTES);
+ for (int v : values) {
+ input.putInt(v);
+ }
+ input.flip();
+
+ try (ChunkCompressor compressor =
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTADELTA)) {
+ ByteBuffer compressed =
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+ compressor.compress(input.slice(), compressed);
+
+ try (ChunkDecompressor decompressor =
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTADELTA)) {
+ int decompressedSize = decompressor.decompressedLength(compressed);
+ ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+ int actualSize = decompressor.decompress(compressed, decompressed);
+ assertEquals(actualSize, values.length * Integer.BYTES);
+
+ for (int expected : values) {
+ assertEquals(decompressed.getInt(), expected);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testRoundTripSingleValueInt()
+ throws IOException {
+ int[] values = new int[]{100};
+
+ ByteBuffer input = ByteBuffer.allocateDirect(values.length *
Integer.BYTES);
+ for (int v : values) {
+ input.putInt(v);
+ }
+ input.flip();
+
+ try (ChunkCompressor compressor =
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTADELTA)) {
+ ByteBuffer compressed =
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+ compressor.compress(input.slice(), compressed);
+
+ try (ChunkDecompressor decompressor =
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTADELTA)) {
+ int decompressedSize = decompressor.decompressedLength(compressed);
+ ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+ int actualSize = decompressor.decompress(compressed, decompressed);
+ assertEquals(actualSize, values.length * Integer.BYTES);
+
+ for (int expected : values) {
+ assertEquals(decompressed.getInt(), expected);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testRoundTripMultiValuesInt()
+ throws IOException {
+ int[] values = new int[]{100, 105, 111, 118, 126, 135, 145, 156, 168};
+
+ ByteBuffer input = ByteBuffer.allocateDirect(values.length *
Integer.BYTES);
+ for (int v : values) {
+ input.putInt(v);
+ }
+ input.flip();
+
+ try (ChunkCompressor compressor =
ChunkCompressorFactory.getCompressor(ChunkCompressionType.DELTADELTA)) {
+ ByteBuffer compressed =
ByteBuffer.allocateDirect(compressor.maxCompressedSize(input.limit()));
+ compressor.compress(input.slice(), compressed);
+
+ try (ChunkDecompressor decompressor =
ChunkCompressorFactory.getDecompressor(ChunkCompressionType.DELTADELTA)) {
+ int decompressedSize = decompressor.decompressedLength(compressed);
+ ByteBuffer decompressed = ByteBuffer.allocateDirect(decompressedSize);
+ int actualSize = decompressor.decompress(compressed, decompressed);
+ assertEquals(actualSize, values.length * Integer.BYTES);
+
+ for (int expected : values) {
+ assertEquals(decompressed.getInt(), expected);
+ }
+ }
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
index b2385edc270..b238bf1917f 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
@@ -67,7 +67,9 @@ public class VarByteChunkSVForwardIndexWriterTest implements
PinotBuffersAfterMe
int[] numbersOfDocs = {10, 1000};
int[][] entryLengths = {{1, 1}, {0, 10}, {0, 100}, {100, 100}, {900,
1000}};
int[] versions = {2, 3};
- return
Arrays.stream(ChunkCompressionType.values()).flatMap(chunkCompressionType ->
IntStream.of(versions).boxed()
+ return Arrays.stream(ChunkCompressionType.values())
+ .filter(t -> t != ChunkCompressionType.DELTA && t !=
ChunkCompressionType.DELTADELTA)
+ .flatMap(chunkCompressionType -> IntStream.of(versions).boxed()
.flatMap(version -> IntStream.of(numbersOfDocs).boxed().flatMap(
totalDocs -> IntStream.of(numDocsPerChunks).boxed()
.flatMap(numDocsPerChunk ->
Arrays.stream(entryLengths).map(lengths -> new Object[]{
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
index 6e1b9d6e819..6c958c1a014 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
@@ -59,6 +59,7 @@ public class MultiValueFixedByteRawIndexCreatorTest
implements PinotBuffersAfter
@DataProvider(name = "compressionTypes")
public Object[][] compressionTypes() {
return Arrays.stream(ChunkCompressionType.values())
+ .filter(t -> t != ChunkCompressionType.DELTA && t !=
ChunkCompressionType.DELTADELTA)
.flatMap(ct -> IntStream.rangeClosed(2, 5).boxed().map(writerVersion
-> new Object[]{ct, writerVersion}))
.toArray(Object[][]::new);
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
index 96b973cdab2..2e14bcd3193 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -61,6 +61,7 @@ public class MultiValueVarByteRawIndexCreatorTest implements
PinotBuffersAfterMe
@DataProvider
public Object[][] params() {
return Arrays.stream(ChunkCompressionType.values())
+ .filter(t -> t != ChunkCompressionType.DELTA && t !=
ChunkCompressionType.DELTADELTA)
.flatMap(chunkCompressionType -> IntStream.rangeClosed(2, 5)
.boxed()
.flatMap(writerVersion -> IntStream.of(10, 100)
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
index 3587345435e..2a2f3d589f1 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
@@ -56,6 +56,7 @@ public class FixedByteChunkSVForwardIndexTest implements
PinotBuffersAfterMethod
@DataProvider(name = "combinations")
public static Object[][] combinations() {
return Arrays.stream(ChunkCompressionType.values())
+ .filter(t -> t != ChunkCompressionType.DELTA && t !=
ChunkCompressionType.DELTADELTA)
.flatMap(chunkCompressionType -> IntStream.of(2, 3, 4)
.mapToObj(version -> new Object[]{chunkCompressionType, version}))
.toArray(Object[][]::new);
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
index 79c678c2609..d7bb4086719 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
@@ -19,7 +19,7 @@
package org.apache.pinot.segment.spi.compression;
public enum ChunkCompressionType {
- PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3), LZ4_LENGTH_PREFIXED(4),
GZIP(5);
+ PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3), LZ4_LENGTH_PREFIXED(4),
GZIP(5), DELTA(6), DELTADELTA(7);
private static final ChunkCompressionType[] VALUES = values();
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
index c34f0661b32..a4324a40711 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
@@ -132,6 +132,14 @@ public class ForwardIndexConfig extends IndexConfig {
_chunkCompressionType = ChunkCompressionType.GZIP;
_dictIdCompressionType = null;
break;
+ case DELTA:
+ _chunkCompressionType = ChunkCompressionType.DELTA;
+ _dictIdCompressionType = null;
+ break;
+ case DELTADELTA:
+ _chunkCompressionType = ChunkCompressionType.DELTADELTA;
+ _dictIdCompressionType = null;
+ break;
case MV_ENTRY_DICT:
_dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT;
_chunkCompressionType = null;
@@ -177,6 +185,10 @@ public class ForwardIndexConfig extends IndexConfig {
return CompressionCodec.ZSTANDARD;
case LZ4:
return CompressionCodec.LZ4;
+ case DELTA:
+ return CompressionCodec.DELTA;
+ case DELTADELTA:
+ return CompressionCodec.DELTADELTA;
default:
throw new IllegalStateException("Unsupported chunk compression type:
" + chunkCompressionType);
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
index 130c1db257c..e4f76c18445 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
@@ -150,7 +150,10 @@ public class FieldConfig extends BaseJsonConfig {
CLP(false, false),
CLPV2(false, false),
CLPV2_ZSTD(false, false),
- CLPV2_LZ4(false, false);
+ CLPV2_LZ4(false, false),
+
+ DELTA(false, false),
+ DELTADELTA(false, false);
//@formatter:on
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]