This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f3a7c3c580 Fixed couple of issues in CLP V2 implementation and added
integration tests. (#16298)
f3a7c3c580 is described below
commit f3a7c3c580a1ea6860d929ba26be45f87628e65c
Author: RAGHVENDRA KUMAR YADAV <[email protected]>
AuthorDate: Tue Jul 8 13:27:49 2025 -0700
Fixed couple of issues in CLP V2 implementation and added integration
tests. (#16298)
---
.../tests/CLPEncodingRealtimeIntegrationTest.java | 12 +++++++++---
.../segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java | 4 +---
.../index/readers/forward/CLPForwardIndexReaderV2.java | 10 +++++++---
3 files changed, 17 insertions(+), 9 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
index d6b1ed1195..cc762327ce 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Random;
import javax.annotation.Nullable;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -36,6 +37,8 @@ import org.testng.annotations.Test;
public class CLPEncodingRealtimeIntegrationTest extends
BaseClusterIntegrationTestSet {
private List<File> _avroFiles;
+ private FieldConfig.CompressionCodec _selectedCompressionCodec;
+ private final Random _random = new Random();
@BeforeClass
public void setUp()
@@ -43,6 +46,10 @@ public class CLPEncodingRealtimeIntegrationTest extends
BaseClusterIntegrationTe
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
_avroFiles = unpackAvroData(_tempDir);
+ // Randomly select CLP or CLPV2 compression codec
+ _selectedCompressionCodec =
+ _random.nextBoolean() ? FieldConfig.CompressionCodec.CLP :
FieldConfig.CompressionCodec.CLPV2;
+
// Start the Pinot cluster
startZk();
// Start a customized controller with more frequent realtime segment
validation
@@ -130,9 +137,8 @@ public class CLPEncodingRealtimeIntegrationTest extends
BaseClusterIntegrationTe
@Override
protected List<FieldConfig> getFieldConfigs() {
List<FieldConfig> fieldConfigs = new ArrayList<>();
- fieldConfigs.add(
- new FieldConfig("logLine", FieldConfig.EncodingType.RAW, null, null,
FieldConfig.CompressionCodec.CLP, null,
- null, null, null));
+ fieldConfigs.add(new
FieldConfig.Builder("logLine").withEncodingType(FieldConfig.EncodingType.RAW)
+ .withCompressionCodec(_selectedCompressionCodec).build());
return fieldConfigs;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java
index 3b16cee494..b8ff2a103e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java
@@ -291,7 +291,7 @@ public class CLPForwardIndexCreatorV2 implements
ForwardIndexCreator {
_dictVarDictFile = new File(_intermediateFilesDir, _column + ".var.dict");
_dictVarDict = new VarLengthValueWriter(_dictVarDictFile, dictVarDictSize);
_dictVarDictSize = dictVarDictSize;
- _dictVarIdFwdIndexFile = new File(_dictVarIdFwdIndexFile, _column +
".dictVars");
+ _dictVarIdFwdIndexFile = new File(_intermediateFilesDir, _column +
".dictVars");
_dictVarIdFwdIndex =
new VarByteChunkForwardIndexWriterV5(_dictVarIdFwdIndexFile,
chunkCompressionType, _targetChunkSize);
@@ -397,8 +397,6 @@ public class CLPForwardIndexCreatorV2 implements
ForwardIndexCreator {
// Write intermediate files to memory mapped buffer
long totalSize = 0;
- _fileBuffer.putInt(MAGIC_BYTES.length);
- totalSize += Integer.BYTES;
_fileBuffer.put(MAGIC_BYTES);
totalSize += MAGIC_BYTES.length;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV2.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV2.java
index 9ae0261c75..1181e1b252 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV2.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV2.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
import org.apache.pinot.segment.local.io.util.VarLengthValueReader;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -78,9 +79,7 @@ public class CLPForwardIndexReaderV2 implements
ForwardIndexReader<CLPForwardInd
public CLPForwardIndexReaderV2(PinotDataBuffer pinotDataBuffer, int numDocs)
{
_numDocs = numDocs;
int offset = 0;
- int magicBytesLength = pinotDataBuffer.getInt(offset);
- offset += Integer.BYTES;
- byte[] magicBytes = new byte[magicBytesLength];
+ byte[] magicBytes = new byte[CLPForwardIndexCreatorV2.MAGIC_BYTES.length];
pinotDataBuffer.copyTo(offset, magicBytes);
// Validate against supported version
@@ -163,6 +162,11 @@ public class CLPForwardIndexReaderV2 implements
ForwardIndexReader<CLPForwardInd
}
}
+ @Override
+ public ChunkCompressionType getCompressionType() {
+ return ChunkCompressionType.PASS_THROUGH;
+ }
+
@Override
public boolean isDictionaryEncoded() {
return false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]