This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f3a7c3c580 Fixed couple of  issues in CLP V2  implementation and added 
integration tests. (#16298)
f3a7c3c580 is described below

commit f3a7c3c580a1ea6860d929ba26be45f87628e65c
Author: RAGHVENDRA KUMAR YADAV <[email protected]>
AuthorDate: Tue Jul 8 13:27:49 2025 -0700

    Fixed couple of  issues in CLP V2  implementation and added integration 
tests. (#16298)
---
 .../tests/CLPEncodingRealtimeIntegrationTest.java            | 12 +++++++++---
 .../segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java   |  4 +---
 .../index/readers/forward/CLPForwardIndexReaderV2.java       | 10 +++++++---
 3 files changed, 17 insertions(+), 9 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
index d6b1ed1195..cc762327ce 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -36,6 +37,8 @@ import org.testng.annotations.Test;
 
 public class CLPEncodingRealtimeIntegrationTest extends 
BaseClusterIntegrationTestSet {
   private List<File> _avroFiles;
+  private FieldConfig.CompressionCodec _selectedCompressionCodec;
+  private final Random _random = new Random();
 
   @BeforeClass
   public void setUp()
@@ -43,6 +46,10 @@ public class CLPEncodingRealtimeIntegrationTest extends 
BaseClusterIntegrationTe
     TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
     _avroFiles = unpackAvroData(_tempDir);
 
+    // Randomly select CLP or CLPV2 compression codec
+    _selectedCompressionCodec =
+        _random.nextBoolean() ? FieldConfig.CompressionCodec.CLP : 
FieldConfig.CompressionCodec.CLPV2;
+
     // Start the Pinot cluster
     startZk();
     // Start a customized controller with more frequent realtime segment 
validation
@@ -130,9 +137,8 @@ public class CLPEncodingRealtimeIntegrationTest extends 
BaseClusterIntegrationTe
   @Override
   protected List<FieldConfig> getFieldConfigs() {
     List<FieldConfig> fieldConfigs = new ArrayList<>();
-    fieldConfigs.add(
-        new FieldConfig("logLine", FieldConfig.EncodingType.RAW, null, null, 
FieldConfig.CompressionCodec.CLP, null,
-            null, null, null));
+    fieldConfigs.add(new 
FieldConfig.Builder("logLine").withEncodingType(FieldConfig.EncodingType.RAW)
+        .withCompressionCodec(_selectedCompressionCodec).build());
 
     return fieldConfigs;
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java
index 3b16cee494..b8ff2a103e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java
@@ -291,7 +291,7 @@ public class CLPForwardIndexCreatorV2 implements 
ForwardIndexCreator {
     _dictVarDictFile = new File(_intermediateFilesDir, _column + ".var.dict");
     _dictVarDict = new VarLengthValueWriter(_dictVarDictFile, dictVarDictSize);
     _dictVarDictSize = dictVarDictSize;
-    _dictVarIdFwdIndexFile = new File(_dictVarIdFwdIndexFile, _column + 
".dictVars");
+    _dictVarIdFwdIndexFile = new File(_intermediateFilesDir, _column + 
".dictVars");
     _dictVarIdFwdIndex =
         new VarByteChunkForwardIndexWriterV5(_dictVarIdFwdIndexFile, 
chunkCompressionType, _targetChunkSize);
 
@@ -397,8 +397,6 @@ public class CLPForwardIndexCreatorV2 implements 
ForwardIndexCreator {
 
       // Write intermediate files to memory mapped buffer
       long totalSize = 0;
-      _fileBuffer.putInt(MAGIC_BYTES.length);
-      totalSize += Integer.BYTES;
       _fileBuffer.put(MAGIC_BYTES);
       totalSize += MAGIC_BYTES.length;
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV2.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV2.java
index 9ae0261c75..1181e1b252 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV2.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV2.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
 import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
 import org.apache.pinot.segment.local.io.util.VarLengthValueReader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -78,9 +79,7 @@ public class CLPForwardIndexReaderV2 implements 
ForwardIndexReader<CLPForwardInd
   public CLPForwardIndexReaderV2(PinotDataBuffer pinotDataBuffer, int numDocs) 
{
     _numDocs = numDocs;
     int offset = 0;
-    int magicBytesLength = pinotDataBuffer.getInt(offset);
-    offset += Integer.BYTES;
-    byte[] magicBytes = new byte[magicBytesLength];
+    byte[] magicBytes = new byte[CLPForwardIndexCreatorV2.MAGIC_BYTES.length];
     pinotDataBuffer.copyTo(offset, magicBytes);
 
     // Validate against supported version
@@ -163,6 +162,11 @@ public class CLPForwardIndexReaderV2 implements 
ForwardIndexReader<CLPForwardInd
     }
   }
 
+  @Override
+  public ChunkCompressionType getCompressionType() {
+    return ChunkCompressionType.PASS_THROUGH;
+  }
+
   @Override
   public boolean isDictionaryEncoded() {
     return false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to