This is an automated email from the ASF dual-hosted git repository.

saurabhd336 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 02d1c128f9 CLP as a compressionCodec (#12504)
02d1c128f9 is described below

commit 02d1c128f9b97b12d3f4c7d10f432ffabcc286f1
Author: Saurabh Dubey <[email protected]>
AuthorDate: Thu Mar 21 15:58:48 2024 +0530

    CLP as a compressionCodec (#12504)
    
    ---------
    
    Co-authored-by: Saurabh Dubey 
<[email protected]>
    Co-authored-by: Saurabh Dubey <[email protected]>
    Co-authored-by: Seunghyun Lee <[email protected]>
---
 .../tests/CLPEncodingRealtimeIntegrationTest.java  | 150 ++++++++++++
 .../src/test/resources/clpEncodingITData.tar.gz    | Bin 0 -> 1863 bytes
 ...clpEncodingRealtimeIntegrationTestSchema.schema |  18 ++
 pinot-segment-local/pom.xml                        |   4 +
 .../local/io/util/VarLengthValueReader.java        |  11 +
 .../writer/impl/FixedBitMVForwardIndexWriter.java  |   2 +-
 .../writer/impl/FixedBitSVForwardIndexWriter.java  |   2 +-
 .../stats/MutableNoDictionaryColStatistics.java    |  13 +-
 .../impl/forward/CLPMutableForwardIndex.java       | 178 ++++++++++++++
 .../creator/impl/SegmentDictionaryCreator.java     |   9 +
 .../creator/impl/fwd/CLPForwardIndexCreatorV1.java | 272 +++++++++++++++++++++
 .../fwd/MultiValueFixedByteRawIndexCreator.java    |  15 +-
 .../stats/AbstractColumnStatisticsCollector.java   |   3 +
 .../creator/impl/stats/CLPStatsProvider.java       |  66 +++++
 .../stats/StringColumnPreIndexStatsCollector.java  |  97 +++++++-
 .../index/forward/ForwardIndexCreatorFactory.java  |   5 +
 .../index/forward/ForwardIndexReaderFactory.java   |  10 +
 .../segment/index/forward/ForwardIndexType.java    |   6 +-
 .../readers/forward/CLPForwardIndexReaderV1.java   | 219 +++++++++++++++++
 .../segment/local/utils/TableConfigUtils.java      |  10 +-
 .../local/segment/creator/DictionariesTest.java    |  51 +++-
 .../index/creator/CLPForwardIndexCreatorTest.java  | 117 +++++++++
 .../mutable/CLPMutableForwardIndexTest.java        | 104 ++++++++
 .../spi/creator/ColumnIndexCreationInfo.java       |   4 +
 .../segment/spi/creator/IndexCreationContext.java  |  24 +-
 .../spi/creator/SegmentGeneratorConfig.java        |   3 +-
 .../segment/spi/creator/StatsCollectorConfig.java  |  14 ++
 .../segment/spi/index/ForwardIndexConfig.java      |   1 +
 .../apache/pinot/spi/config/table/FieldConfig.java |   3 +
 29 files changed, 1388 insertions(+), 23 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
new file mode 100644
index 0000000000..c8b5d88646
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class CLPEncodingRealtimeIntegrationTest extends 
BaseClusterIntegrationTestSet {
+  private List<File> _avroFiles;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+    _avroFiles = unpackAvroData(_tempDir);
+
+    // Start the Pinot cluster
+    startZk();
+    // Start a customized controller with more frequent realtime segment 
validation
+    startController();
+    startBroker();
+    startServers(1);
+
+    startKafka();
+    pushAvroIntoKafka(_avroFiles);
+
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
+    addTableConfig(tableConfig);
+
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @Nullable
+  @Override
+  protected List<String> getInvertedIndexColumns() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  protected List<String> getRangeIndexColumns() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  protected List<String> getBloomFilterColumns() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  protected String getSortedColumn() {
+    return null;
+  }
+
+  @Override
+  protected List<String> getNoDictionaryColumns() {
+    return Collections.singletonList("logLine");
+  }
+
+  @Test
+  public void testValues()
+      throws Exception {
+    Assert.assertEquals(getPinotConnection().execute(
+            "SELECT count(*) FROM " + getTableName() + " WHERE 
REGEXP_LIKE(logLine, '.*executor.*')").getResultSet(0)
+        .getLong(0), 53);
+  }
+
+  protected int getRealtimeSegmentFlushSize() {
+    return 30;
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return 100;
+  }
+
+  @Override
+  protected String getTableName() {
+    return "clpEncodingIT";
+  }
+
+  @Override
+  protected String getAvroTarFileName() {
+    return "clpEncodingITData.tar.gz";
+  }
+
+  @Override
+  protected String getSchemaFileName() {
+    return "clpEncodingRealtimeIntegrationTestSchema.schema";
+  }
+
+  @Override
+  protected String getTimeColumnName() {
+    return "timestampInEpoch";
+  }
+
+  @Override
+  protected List<FieldConfig> getFieldConfigs() {
+    List<FieldConfig> fieldConfigs = new ArrayList<>();
+    fieldConfigs.add(
+        new FieldConfig("logLine", FieldConfig.EncodingType.RAW, null, null, 
FieldConfig.CompressionCodec.CLP, null,
+            null, null, null));
+
+    return fieldConfigs;
+  }
+
+  @Override
+  protected IngestionConfig getIngestionConfig() {
+    List<TransformConfig> transforms = new ArrayList<>();
+    transforms.add(new TransformConfig("timestampInEpoch", "now()"));
+
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(transforms);
+
+    return ingestionConfig;
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/resources/clpEncodingITData.tar.gz 
b/pinot-integration-tests/src/test/resources/clpEncodingITData.tar.gz
new file mode 100644
index 0000000000..c2f5baaa5f
Binary files /dev/null and 
b/pinot-integration-tests/src/test/resources/clpEncodingITData.tar.gz differ
diff --git 
a/pinot-integration-tests/src/test/resources/clpEncodingRealtimeIntegrationTestSchema.schema
 
b/pinot-integration-tests/src/test/resources/clpEncodingRealtimeIntegrationTestSchema.schema
new file mode 100644
index 0000000000..9aa38a1a19
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/resources/clpEncodingRealtimeIntegrationTestSchema.schema
@@ -0,0 +1,18 @@
+{
+  "schemaName": "clpEncodingIT",
+  "dimensionFieldSpecs": [
+    {
+      "name": "logLine",
+      "dataType": "STRING"
+    }
+  ],
+  "dateTimeFieldSpecs": [
+    {
+      "name": "timestampInEpoch",
+      "dataType": "LONG",
+      "notNull": false,
+      "format": "1:MILLISECONDS:EPOCH",
+      "granularity": "1:MILLISECONDS"
+    }
+  ]
+}
\ No newline at end of file
diff --git a/pinot-segment-local/pom.xml b/pinot-segment-local/pom.xml
index 27a7dcd8c6..ac06098c24 100644
--- a/pinot-segment-local/pom.xml
+++ b/pinot-segment-local/pom.xml
@@ -151,5 +151,9 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.yscope.clp</groupId>
+      <artifactId>clp-ffi</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java
index cd1346896a..d9384bf16f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.segment.local.io.util;
 
+import java.util.List;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -101,6 +103,15 @@ public class VarLengthValueReader implements ValueReader {
     return new String(buffer, 0, length, UTF_8);
   }
 
+  public void recordOffsetRanges(int index, long baseOffset, 
List<ForwardIndexReader.ByteRange> rangeList) {
+    int offsetPosition = _dataSectionStartOffSet + Integer.BYTES * index;
+    int startOffset = _dataBuffer.getInt(offsetPosition);
+    int endOffset = _dataBuffer.getInt(offsetPosition + Integer.BYTES);
+    rangeList.add(new ForwardIndexReader.ByteRange(offsetPosition + 
baseOffset, 2 * Integer.BYTES));
+    int length = endOffset - startOffset;
+    rangeList.add(new ForwardIndexReader.ByteRange(startOffset + baseOffset, 
length));
+  }
+
   @Override
   public String getPaddedString(int index, int numBytesPerValue, byte[] 
buffer) {
     throw new UnsupportedOperationException();
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVForwardIndexWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVForwardIndexWriter.java
index 650a75e28f..fd46a843ee 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVForwardIndexWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVForwardIndexWriter.java
@@ -75,7 +75,7 @@ public class FixedBitMVForwardIndexWriter implements 
Closeable {
   private int _nextDocId = 0;
 
   public FixedBitMVForwardIndexWriter(File file, int numDocs, int 
totalNumValues, int numBitsPerValue)
-      throws Exception {
+      throws IOException {
     float averageValuesPerDoc = totalNumValues / numDocs;
     _docsPerChunk = (int) (Math.ceil(PREFERRED_NUM_VALUES_PER_CHUNK / 
averageValuesPerDoc));
     _numChunks = (numDocs + _docsPerChunk - 1) / _docsPerChunk;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitSVForwardIndexWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitSVForwardIndexWriter.java
index b7c8c55c5b..f6712b38bf 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitSVForwardIndexWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitSVForwardIndexWriter.java
@@ -37,7 +37,7 @@ public class FixedBitSVForwardIndexWriter implements 
Closeable {
   private int _nextDocId = 0;
 
   public FixedBitSVForwardIndexWriter(File file, int numDocs, int 
numBitsPerValue)
-      throws Exception {
+      throws IOException {
     // Convert to long in order to avoid int overflow
     long length = ((long) numDocs * numBitsPerValue + Byte.SIZE - 1) / 
Byte.SIZE;
     // Backward-compatible: index file is always big-endian
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
index 0cf4c37b96..5f2d98893b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
@@ -21,6 +21,8 @@ package 
org.apache.pinot.segment.local.realtime.converter.stats;
 import com.google.common.base.Preconditions;
 import java.util.Map;
 import java.util.Set;
+import 
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndex;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
 import org.apache.pinot.segment.spi.creator.ColumnStatistics;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
@@ -30,7 +32,7 @@ import 
org.apache.pinot.segment.spi.partition.PartitionFunction;
 import static org.apache.pinot.segment.spi.Constants.UNKNOWN_CARDINALITY;
 
 
-public class MutableNoDictionaryColStatistics implements ColumnStatistics {
+public class MutableNoDictionaryColStatistics implements ColumnStatistics, 
CLPStatsProvider {
   private final DataSourceMetadata _dataSourceMetadata;
   private final MutableForwardIndex _forwardIndex;
 
@@ -111,4 +113,13 @@ public class MutableNoDictionaryColStatistics implements 
ColumnStatistics {
   public Set<Integer> getPartitions() {
     return _dataSourceMetadata.getPartitions();
   }
+
+  @Override
+  public CLPStats getCLPStats() {
+    if (_forwardIndex instanceof CLPMutableForwardIndex) {
+      return ((CLPMutableForwardIndex) _forwardIndex).getCLPStats();
+    }
+    throw new IllegalStateException(
+        "CLP stats not available for column: " + 
_dataSourceMetadata.getFieldSpec().getName());
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndex.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndex.java
new file mode 100644
index 0000000000..bed7a4e9be
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndex.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.forward;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageDecoder;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import 
org.apache.pinot.segment.local.realtime.impl.dictionary.StringOffHeapMutableDictionary;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
+import org.apache.pinot.segment.spi.index.mutable.MutableDictionary;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+public class CLPMutableForwardIndex implements MutableForwardIndex {
+  // TODO: We can get better dynamic estimates using segment stats
+  private static final int ESTIMATED_LOG_TYPE_CARDINALITY = 10000;
+  private static final int ESTIMATED_DICT_VARS_CARDINALITY = 10000;
+  private static final int ESTIMATED_LOG_TYPE_LENGTH = 200;
+  private static final int ESTIMATED_DICT_VARS_LENGTH = 50;
+  private FieldSpec.DataType _storedType;
+  private final EncodedMessage _clpEncodedMessage;
+  private final MessageEncoder _clpMessageEncoder;
+  private final MessageDecoder _clpMessageDecoder;
+  private final MutableDictionary _logTypeDictCreator;
+  private final MutableDictionary _dictVarsDictCreator;
+  private final FixedByteSVMutableForwardIndex _logTypeFwdIndex;
+  private final FixedByteMVMutableForwardIndex _dictVarsFwdIndex;
+  private final FixedByteMVMutableForwardIndex _encodedVarsFwdIndex;
+
+  // clp stats
+  int _totalNumberOfDictVars = 0;
+  int _maxNumberOfEncodedVars = 0;
+  int _totalNumberOfEncodedVars = 0;
+  private int _lengthOfShortestElement;
+  private int _lengthOfLongestElement;
+
+  public CLPMutableForwardIndex(String columnName, FieldSpec.DataType 
storedType,
+      PinotDataBufferMemoryManager memoryManager, int capacity) {
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new 
MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+    _logTypeDictCreator =
+        new StringOffHeapMutableDictionary(ESTIMATED_LOG_TYPE_CARDINALITY, 
ESTIMATED_LOG_TYPE_CARDINALITY / 10,
+            memoryManager, columnName + "_logType.dict", 
ESTIMATED_LOG_TYPE_LENGTH);
+    _dictVarsDictCreator =
+        new StringOffHeapMutableDictionary(ESTIMATED_DICT_VARS_CARDINALITY, 
ESTIMATED_DICT_VARS_CARDINALITY / 10,
+            memoryManager, columnName + "_dictVars.dict", 
ESTIMATED_DICT_VARS_LENGTH);
+
+    _logTypeFwdIndex = new FixedByteSVMutableForwardIndex(true, 
FieldSpec.DataType.INT, capacity, memoryManager,
+        columnName + "_logType.fwd");
+    _dictVarsFwdIndex =
+        new 
FixedByteMVMutableForwardIndex(ForwardIndexType.MAX_MULTI_VALUES_PER_ROW, 20, 
capacity, Integer.BYTES,
+            memoryManager, columnName + "_dictVars.fwd", true, 
FieldSpec.DataType.INT);
+    _encodedVarsFwdIndex =
+        new 
FixedByteMVMutableForwardIndex(ForwardIndexType.MAX_MULTI_VALUES_PER_ROW, 20, 
capacity, Long.BYTES,
+            memoryManager, columnName + "_encodedVars.fwd", true, 
FieldSpec.DataType.LONG);
+    _clpMessageDecoder = new 
MessageDecoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+    _storedType = storedType;
+  }
+
+  @Override
+  public int getLengthOfShortestElement() {
+    return _lengthOfShortestElement;
+  }
+
+  @Override
+  public int getLengthOfLongestElement() {
+    return _lengthOfLongestElement;
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Override
+  public FieldSpec.DataType getStoredType() {
+    return _storedType;
+  }
+
+  @Override
+  public void setString(int docId, String value) {
+    String logtype;
+    String[] dictVars;
+    Long[] encodedVars;
+
+    _lengthOfLongestElement = Math.max(_lengthOfLongestElement, 
value.length());
+    _lengthOfShortestElement = Math.min(_lengthOfShortestElement, 
value.length());
+
+    try {
+      _clpMessageEncoder.encodeMessage(value, _clpEncodedMessage);
+      logtype = _clpEncodedMessage.getLogTypeAsString();
+      dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
+      encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Failed to encode message: " + value, 
e);
+    }
+
+    _totalNumberOfDictVars += dictVars.length;
+    _totalNumberOfEncodedVars += encodedVars.length;
+    _maxNumberOfEncodedVars = Math.max(_maxNumberOfEncodedVars, 
encodedVars.length);
+
+    int logTypeDictId = _logTypeDictCreator.index(logtype);
+    _logTypeFwdIndex.setDictId(docId, logTypeDictId);
+
+    int[] dictVarsDictIds = new int[dictVars.length];
+    for (int i = 0; i < dictVars.length; i++) {
+      dictVarsDictIds[i] = _dictVarsDictCreator.index(dictVars[i]);
+    }
+    _dictVarsFwdIndex.setDictIdMV(docId, dictVarsDictIds);
+
+    long[] encodedVarsLongs = new long[encodedVars.length];
+    for (int i = 0; i < encodedVars.length; i++) {
+      encodedVarsLongs[i] = encodedVars[i];
+    }
+    _encodedVarsFwdIndex.setLongMV(docId, encodedVarsLongs);
+  }
+
+  @Override
+  public String getString(int docId) {
+    String logType = 
_logTypeDictCreator.getStringValue(_logTypeFwdIndex.getDictId(docId));
+    int[] dictVarsDictIds = _dictVarsFwdIndex.getDictIdMV(docId);
+    String[] dictVars = new String[dictVarsDictIds.length];
+    for (int i = 0; i < dictVarsDictIds.length; i++) {
+      dictVars[i] = _dictVarsDictCreator.getStringValue(dictVarsDictIds[i]);
+    }
+    long[] encodedVarsLongs = _encodedVarsFwdIndex.getLongMV(docId);
+    try {
+      return _clpMessageDecoder.decodeMessage(logType, dictVars, 
encodedVarsLongs);
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Failed to encode message: " + 
logType, e);
+    }
+  }
+
+  public StringColumnPreIndexStatsCollector.CLPStats getCLPStats() {
+    return new CLPStatsProvider.CLPStats((String[]) 
_logTypeDictCreator.getSortedValues(),
+        (String[]) _dictVarsDictCreator.getSortedValues(), 
_totalNumberOfDictVars, _totalNumberOfEncodedVars,
+        _maxNumberOfEncodedVars);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _logTypeDictCreator.close();
+    _dictVarsDictCreator.close();
+    _logTypeFwdIndex.close();
+    _dictVarsFwdIndex.close();
+    _encodedVarsFwdIndex.close();
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java
index 96fc96cee4..1dfb65adea 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java
@@ -70,12 +70,21 @@ public class SegmentDictionaryCreator implements 
IndexCreator {
   private Object2IntOpenHashMap<Object> _objectValueToIndexMap;
   private int _numBytesPerEntry = 0;
 
+  public SegmentDictionaryCreator(String columnName, DataType storedType, File 
indexFile,
+      boolean useVarLengthDictionary) {
+    _columnName = columnName;
+    _storedType = storedType;
+    _dictionaryFile = indexFile;
+    _useVarLengthDictionary = useVarLengthDictionary;
+  }
+
   public SegmentDictionaryCreator(FieldSpec fieldSpec, File indexDir, boolean 
useVarLengthDictionary) {
     _columnName = fieldSpec.getName();
     _storedType = fieldSpec.getDataType().getStoredType();
     _dictionaryFile = new File(indexDir, _columnName + 
DictionaryIndexType.getFileExtension());
     _useVarLengthDictionary = useVarLengthDictionary;
   }
+
   @Override
   public void add(@Nonnull Object value, int dictId)
       throws IOException {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV1.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV1.java
new file mode 100644
index 0000000000..c681265ffb
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV1.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.StandardOpenOption;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedBitMVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedBitSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * Writer for CLP forward index.
+ * <p>CLP forward index contains 3 parts:
+ * <ul>
+ *   <li>Header bytes: MAGIC_BYTES, version, </li>
+ *   <li>LogType dictionary: dictionary for logType column</li>
+ *   <li>DictVars dictionary: dictionary for dictVars column</li>
+ *   <li>LogType fwd index: fwd index for logType column</li>
+ *   <li>DictVars fwd index: fwd index for dictVars column</li>
+ *   <li>EncodedVars fwd index: raw fwd index for encodedVars column</li>
+ * </ul>
+ */
+
+public class CLPForwardIndexCreatorV1 implements ForwardIndexCreator {
+  public static final byte[] MAGIC_BYTES = 
"CLP.v1".getBytes(StandardCharsets.UTF_8);
+  private final String _column;
+  private final int _numDocs;
+  private final File _intermediateFilesDir;
+  private final FileChannel _dataFile;
+  private final ByteBuffer _fileBuffer;
+  private final EncodedMessage _clpEncodedMessage;
+  private final MessageEncoder _clpMessageEncoder;
+  private final StringColumnPreIndexStatsCollector.CLPStats _clpStats;
+  private final SegmentDictionaryCreator _logTypeDictCreator;
+  private final SegmentDictionaryCreator _dictVarsDictCreator;
+  private final FixedBitSVForwardIndexWriter _logTypeFwdIndexWriter;
+  private final FixedBitMVForwardIndexWriter _dictVarsFwdIndexWriter;
+  private final MultiValueFixedByteRawIndexCreator _encodedVarsFwdIndexWriter;
+  private final File _logTypeDictFile;
+  private final File _dictVarsDictFile;
+  private final File _logTypeFwdIndexFile;
+  private final File _dictVarsFwdIndexFile;
+  private final File _encodedVarsFwdIndexFile;
+
+  public CLPForwardIndexCreatorV1(File baseIndexDir, String column, int 
numDocs, ColumnStatistics columnStatistics)
+      throws IOException {
+    _column = column;
+    _numDocs = numDocs;
+    _intermediateFilesDir =
+        new File(baseIndexDir, column + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION + ".clp.tmp");
+    if (_intermediateFilesDir.exists()) {
+      FileUtils.cleanDirectory(_intermediateFilesDir);
+    } else {
+      FileUtils.forceMkdir(_intermediateFilesDir);
+    }
+
+    _dataFile =
+        new RandomAccessFile(new File(baseIndexDir, column + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION),
+            "rw").getChannel();
+    _fileBuffer = _dataFile.map(FileChannel.MapMode.READ_WRITE, 0, 
Integer.MAX_VALUE);
+
+    CLPStatsProvider statsCollector = (CLPStatsProvider) columnStatistics;
+    _clpStats = statsCollector.getCLPStats();
+    _logTypeDictFile = new File(_intermediateFilesDir, _column + 
"_clp_logtype.dict");
+    _logTypeDictCreator =
+        new SegmentDictionaryCreator(_column + "_clp_logtype.dict", 
FieldSpec.DataType.STRING, _logTypeDictFile, true);
+    _logTypeDictCreator.build(_clpStats.getSortedLogTypeValues());
+
+    _dictVarsDictFile = new File(_intermediateFilesDir, _column + 
"_clp_dictvars.dict");
+    _dictVarsDictCreator =
+        new SegmentDictionaryCreator(_column + "_clp_dictvars.dict", 
FieldSpec.DataType.STRING, _dictVarsDictFile,
+            true);
+    _dictVarsDictCreator.build(_clpStats.getSortedDictVarValues());
+
+    _logTypeFwdIndexFile = new File(_intermediateFilesDir, column + 
"_clp_logtype.fwd");
+    _logTypeFwdIndexWriter = new 
FixedBitSVForwardIndexWriter(_logTypeFwdIndexFile, numDocs,
+        
PinotDataBitSet.getNumBitsPerValue(_clpStats.getSortedLogTypeValues().length - 
1));
+
+    _dictVarsFwdIndexFile = new File(_intermediateFilesDir, column + 
"_clp_dictvars.fwd");
+    _dictVarsFwdIndexWriter =
+        new FixedBitMVForwardIndexWriter(_dictVarsFwdIndexFile, numDocs, 
_clpStats.getTotalNumberOfDictVars(),
+            
PinotDataBitSet.getNumBitsPerValue(_clpStats.getSortedDictVarValues().length - 
1));
+
+    _encodedVarsFwdIndexFile = new File(_intermediateFilesDir, column + 
"_clp_encodedvars.fwd");
+    _encodedVarsFwdIndexWriter =
+        new MultiValueFixedByteRawIndexCreator(_encodedVarsFwdIndexFile, 
ChunkCompressionType.LZ4, numDocs,
+            FieldSpec.DataType.LONG, _clpStats.getMaxNumberOfEncodedVars(), 
false,
+            VarByteChunkForwardIndexWriterV4.VERSION);
+    _clpStats.clear();
+
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new 
MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return FieldSpec.DataType.STRING;
+  }
+
+  @Override
+  public void putBigDecimal(BigDecimal value) {
+    throw new UnsupportedOperationException("Non string types are not 
supported");
+  }
+
+  @Override
+  public void putString(String value) {
+    String logtype;
+    String[] dictVars;
+    Long[] encodedVars;
+
+    try {
+      _clpMessageEncoder.encodeMessage(value, _clpEncodedMessage);
+      logtype = _clpEncodedMessage.getLogTypeAsString();
+      dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
+      encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Failed to encode message: " + value, 
e);
+    }
+
+    if (logtype == null) {
+      logtype = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
+    }
+
+    if (dictVars == null) {
+      dictVars = new 
String[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING};
+    }
+
+    if (encodedVars == null) {
+      encodedVars = new Long[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG};
+    }
+
+    addCLPFields(logtype, dictVars, encodedVars);
+  }
+
+  private void addCLPFields(String logtype, String[] dictVars, Long[] 
encodedVars) {
+    int logTypeDictId = _logTypeDictCreator.indexOfSV(logtype);
+    int[] dictVarDictIds = _dictVarsDictCreator.indexOfMV(dictVars);
+
+    _logTypeFwdIndexWriter.putDictId(logTypeDictId);
+    _dictVarsFwdIndexWriter.putDictIds(dictVarDictIds);
+
+    long[] encodedVarsUnboxed = new long[encodedVars.length];
+    for (int i = 0; i < encodedVars.length; i++) {
+      encodedVarsUnboxed[i] = encodedVars[i];
+    }
+    _encodedVarsFwdIndexWriter.putLongMV(encodedVarsUnboxed);
+  }
+
+  @Override
+  public void seal()
+      throws IOException {
+    // Append all of these into fileBuffer
+    _logTypeDictCreator.seal();
+    _logTypeDictCreator.close();
+
+    _dictVarsDictCreator.seal();
+    _dictVarsDictCreator.close();
+
+    _logTypeFwdIndexWriter.close();
+    _dictVarsFwdIndexWriter.close();
+    _encodedVarsFwdIndexWriter.close();
+
+    long totalSize = 0;
+    _fileBuffer.put(MAGIC_BYTES);
+    totalSize += MAGIC_BYTES.length;
+
+    _fileBuffer.putInt(1); // version
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt(_clpStats.getTotalNumberOfDictVars());
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt(_logTypeDictCreator.getNumBytesPerEntry());
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt(_dictVarsDictCreator.getNumBytesPerEntry());
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _logTypeDictFile.length()); // logType dict length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _dictVarsDictFile.length()); // dictVars dict 
length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _logTypeFwdIndexFile.length()); // logType fwd 
index length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _dictVarsFwdIndexFile.length()); // dictVars fwd 
index length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _encodedVarsFwdIndexFile.length()); // 
encodedVars fwd index length
+    totalSize += Integer.BYTES;
+
+    copyFileIntoBuffer(_logTypeDictFile);
+    totalSize += _logTypeDictFile.length();
+
+    copyFileIntoBuffer(_dictVarsDictFile);
+    totalSize += _dictVarsDictFile.length();
+
+    copyFileIntoBuffer(_logTypeFwdIndexFile);
+    totalSize += _logTypeFwdIndexFile.length();
+
+    copyFileIntoBuffer(_dictVarsFwdIndexFile);
+    totalSize += _dictVarsFwdIndexFile.length();
+
+    copyFileIntoBuffer(_encodedVarsFwdIndexFile);
+    totalSize += _encodedVarsFwdIndexFile.length();
+
+    _dataFile.truncate(totalSize);
+  }
+
+  private void copyFileIntoBuffer(File file) throws IOException {
+    try (FileChannel from = (FileChannel.open(file.toPath(), 
StandardOpenOption.READ))) {
+      _fileBuffer.put(from.map(FileChannel.MapMode.READ_ONLY, 0, 
file.length()));
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    // Delete all temp files
+    _dataFile.close();
+    FileUtils.deleteDirectory(_intermediateFilesDir);
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
index a1ba59f3b2..d459db1f6f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -73,15 +73,22 @@ public class MultiValueFixedByteRawIndexCreator implements 
ForwardIndexCreator {
       int totalDocs, DataType valueType, int maxNumberOfMultiValueElements, 
boolean deriveNumDocsPerChunk,
       int writerVersion)
       throws IOException {
-    File file = new File(baseIndexDir, column + 
Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+    this(new File(baseIndexDir, column + 
Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION), compressionType, totalDocs,
+        valueType, maxNumberOfMultiValueElements, deriveNumDocsPerChunk, 
writerVersion);
+  }
+
+  public MultiValueFixedByteRawIndexCreator(File indexFile, 
ChunkCompressionType compressionType, int totalDocs,
+      DataType valueType, int maxNumberOfMultiValueElements, boolean 
deriveNumDocsPerChunk, int writerVersion)
+      throws IOException {
     // Store the length followed by the values
     int totalMaxLength = Integer.BYTES + (maxNumberOfMultiValueElements * 
valueType.getStoredType().size());
     int numDocsPerChunk = deriveNumDocsPerChunk ? Math.max(
         TARGET_MAX_CHUNK_SIZE / (totalMaxLength + 
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1)
         : DEFAULT_NUM_DOCS_PER_CHUNK;
-    _indexWriter = writerVersion < VarByteChunkForwardIndexWriterV4.VERSION ? 
new VarByteChunkForwardIndexWriter(file,
-        compressionType, totalDocs, numDocsPerChunk, totalMaxLength, 
writerVersion)
-        : new VarByteChunkForwardIndexWriterV4(file, compressionType, 
TARGET_MAX_CHUNK_SIZE);
+    _indexWriter =
+        writerVersion < VarByteChunkForwardIndexWriterV4.VERSION ? new 
VarByteChunkForwardIndexWriter(indexFile,
+            compressionType, totalDocs, numDocsPerChunk, totalMaxLength, 
writerVersion)
+            : new VarByteChunkForwardIndexWriterV4(indexFile, compressionType, 
TARGET_MAX_CHUNK_SIZE);
     _valueType = valueType;
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
index 5af1e14999..41704b9bd4 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
@@ -27,6 +27,7 @@ import org.apache.pinot.segment.spi.creator.ColumnStatistics;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 
 
@@ -44,6 +45,7 @@ import org.apache.pinot.spi.data.FieldSpec;
 public abstract class AbstractColumnStatisticsCollector implements 
ColumnStatistics {
   protected static final int INITIAL_HASH_SET_SIZE = 1000;
   protected final FieldSpec _fieldSpec;
+  protected final FieldConfig _fieldConfig;
 
   private final Map<String, String> _partitionFunctionConfig;
   private final PartitionFunction _partitionFunction;
@@ -57,6 +59,7 @@ public abstract class AbstractColumnStatisticsCollector 
implements ColumnStatist
 
   public AbstractColumnStatisticsCollector(String column, StatsCollectorConfig 
statsCollectorConfig) {
     _fieldSpec = statsCollectorConfig.getFieldSpecForColumn(column);
+    _fieldConfig = statsCollectorConfig.getFieldConfigForColumn(column);
     Preconditions.checkArgument(_fieldSpec != null, "Failed to find column: 
%s", column);
     if (!_fieldSpec.isSingleValueField()) {
       _sorted = false;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/CLPStatsProvider.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/CLPStatsProvider.java
new file mode 100644
index 0000000000..b861188620
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/CLPStatsProvider.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+public interface CLPStatsProvider {
+
+  CLPStats getCLPStats();
+
+  class CLPStats {
+    int _totalNumberOfDictVars = 0;
+    int _totalNumberOfEncodedVars = 0;
+    int _maxNumberOfEncodedVars = 0;
+    private String[] _sortedLogTypeValues;
+    private String[] _sortedDictVarValues;
+
+    public CLPStats(String[] sortedLogTypeValues, String[] 
sortedDictVarValues, int totalNumberOfDictVars,
+        int totalNumberOfEncodedVars, int maxNumberOfEncodedVars) {
+      _sortedLogTypeValues = sortedLogTypeValues;
+      _sortedDictVarValues = sortedDictVarValues;
+      _totalNumberOfDictVars = totalNumberOfDictVars;
+      _totalNumberOfEncodedVars = totalNumberOfEncodedVars;
+      _maxNumberOfEncodedVars = maxNumberOfEncodedVars;
+    }
+
+    public void clear() {
+      _sortedLogTypeValues = null;
+      _sortedDictVarValues = null;
+    }
+
+    public int getMaxNumberOfEncodedVars() {
+      return _maxNumberOfEncodedVars;
+    }
+
+    public int getTotalNumberOfDictVars() {
+      return _totalNumberOfDictVars;
+    }
+
+    public int getTotalNumberOfEncodedVars() {
+      return _totalNumberOfEncodedVars;
+    }
+
+    public String[] getSortedLogTypeValues() {
+      return _sortedLogTypeValues;
+    }
+
+    public String[] getSortedDictVarValues() {
+      return _sortedDictVarValues;
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
index b896ac30ff..d606c784d2 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
@@ -18,24 +18,35 @@
  */
 package org.apache.pinot.segment.local.segment.creator.impl.stats;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
 import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Set;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 
-public class StringColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCollector {
+public class StringColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCollector implements CLPStatsProvider {
   private Set<String> _values = new ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
   private int _minLength = Integer.MAX_VALUE;
   private int _maxLength = 0;
   private int _maxRowLength = 0;
   private String[] _sortedValues;
   private boolean _sealed = false;
+  private CLPStatsCollector _clpStatsCollector;
 
   public StringColumnPreIndexStatsCollector(String column, 
StatsCollectorConfig statsCollectorConfig) {
     super(column, statsCollectorConfig);
+    if (_fieldConfig != null && _fieldConfig.getCompressionCodec() == 
FieldConfig.CompressionCodec.CLP) {
+      _clpStatsCollector = new CLPStatsCollector();
+    }
   }
 
   @Override
@@ -48,6 +59,9 @@ public class StringColumnPreIndexStatsCollector extends 
AbstractColumnStatistics
       for (Object obj : values) {
         String value = (String) obj;
         _values.add(value);
+        if (_clpStatsCollector != null) {
+          _clpStatsCollector.collect(value);
+        }
 
         int length = value.getBytes(UTF_8).length;
         _minLength = Math.min(_minLength, length);
@@ -61,6 +75,9 @@ public class StringColumnPreIndexStatsCollector extends 
AbstractColumnStatistics
     } else {
       String value = (String) entry;
       addressSorted(value);
+      if (_clpStatsCollector != null) {
+        _clpStatsCollector.collect(value);
+      }
       if (_values.add(value)) {
         if (isPartitionEnabled()) {
           updatePartition(value);
@@ -74,6 +91,14 @@ public class StringColumnPreIndexStatsCollector extends 
AbstractColumnStatistics
     }
   }
 
+  @Override
+  public CLPStats getCLPStats() {
+    if (_sealed) {
+      return _clpStatsCollector.getCLPStats();
+    }
+    throw new IllegalStateException("The collector must be sealed before 
calling getCLPStats");
+  }
+
   @Override
   public String getMinValue() {
     if (_sealed) {
@@ -125,7 +150,77 @@ public class StringColumnPreIndexStatsCollector extends 
AbstractColumnStatistics
       _sortedValues = _values.toArray(new String[0]);
       _values = null;
       Arrays.sort(_sortedValues);
+      if (_clpStatsCollector != null) {
+        _clpStatsCollector.seal();
+      }
       _sealed = true;
     }
   }
+
+  @VisibleForTesting
+  public static class CLPStatsCollector {
+    private final EncodedMessage _clpEncodedMessage;
+    private final MessageEncoder _clpMessageEncoder;
+    int _totalNumberOfDictVars = 0;
+    int _totalNumberOfEncodedVars = 0;
+    int _maxNumberOfEncodedVars = 0;
+    private Set<String> _logTypes = new 
ObjectOpenHashSet<>(AbstractColumnStatisticsCollector.INITIAL_HASH_SET_SIZE);
+    private Set<String> _dictVars = new 
ObjectOpenHashSet<>(AbstractColumnStatisticsCollector.INITIAL_HASH_SET_SIZE);
+    private CLPStats _clpStats;
+
+    public CLPStatsCollector() {
+      _clpEncodedMessage = new EncodedMessage();
+      _clpMessageEncoder = new 
MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+          BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+    }
+
+    public void collect(String value) {
+      String logType;
+      String[] dictVars;
+      Long[] encodedVars;
+
+      try {
+        _clpMessageEncoder.encodeMessage(value, _clpEncodedMessage);
+        logType = _clpEncodedMessage.getLogTypeAsString();
+        dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
+        encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Failed to encode message: " + 
value, e);
+      }
+
+      if (logType == null) {
+        logType = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
+      }
+
+      if (dictVars == null) {
+        dictVars = new 
String[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING};
+      }
+
+      if (encodedVars == null) {
+        encodedVars = new 
Long[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG};
+      }
+
+      _logTypes.add(logType);
+      _dictVars.addAll(Arrays.asList(dictVars));
+      _totalNumberOfDictVars += dictVars.length;
+      _totalNumberOfEncodedVars += encodedVars.length;
+      _maxNumberOfEncodedVars = Math.max(_maxNumberOfEncodedVars, 
encodedVars.length);
+    }
+
+    public void seal() {
+      String[] sortedLogTypeValues = _logTypes.toArray(new String[0]);
+      _logTypes = null;
+      Arrays.sort(sortedLogTypeValues);
+      String[] sortedDictVarValues = _dictVars.toArray(new String[0]);
+      _dictVars = null;
+      Arrays.sort(sortedDictVarValues);
+      _clpStats =
+          new CLPStats(sortedLogTypeValues, sortedDictVarValues, 
_totalNumberOfDictVars, _totalNumberOfEncodedVars,
+              _maxNumberOfEncodedVars);
+    }
+
+    public CLPStats getCLPStats() {
+      return _clpStats;
+    }
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
index ded5000b0c..cd0417a258 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.segment.index.forward;
 
 import java.io.File;
 import java.io.IOException;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueEntryDictForwardIndexCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
@@ -34,6 +35,7 @@ import 
org.apache.pinot.segment.spi.compression.DictIdCompressionType;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
 import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -69,6 +71,9 @@ public class ForwardIndexCreatorFactory {
     } else {
       // Dictionary disabled columns
       DataType storedType = fieldSpec.getDataType().getStoredType();
+      if (indexConfig.getCompressionCodec() == 
FieldConfig.CompressionCodec.CLP) {
+        return new CLPForwardIndexCreatorV1(indexDir, columnName, 
numTotalDocs, context.getColumnStatistics());
+      }
       ChunkCompressionType chunkCompressionType = 
indexConfig.getChunkCompressionType();
       if (chunkCompressionType == null) {
         chunkCompressionType = 
ForwardIndexType.getDefaultCompressionType(fieldSpec.getFieldType());
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
index b5ca2b83c1..86a1038dad 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
@@ -19,7 +19,10 @@
 
 package org.apache.pinot.segment.local.segment.index.forward;
 
+import org.apache.commons.lang.ArrayUtils;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV1;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVEntryDictForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
@@ -83,6 +86,13 @@ public class ForwardIndexReaderFactory extends 
IndexReaderFactory.Default<Forwar
         }
       }
     } else {
+      if (dataBuffer.size() >= CLPForwardIndexCreatorV1.MAGIC_BYTES.length) {
+        byte[] magicBytes = new 
byte[CLPForwardIndexCreatorV1.MAGIC_BYTES.length];
+        dataBuffer.copyTo(0, magicBytes);
+        if (ArrayUtils.isEquals(magicBytes, 
CLPForwardIndexCreatorV1.MAGIC_BYTES)) {
+          return new CLPForwardIndexReaderV1(dataBuffer, 
metadata.getTotalDocs());
+        }
+      }
       return createRawIndexReader(dataBuffer, 
metadata.getDataType().getStoredType(), metadata.isSingleValue());
     }
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
index a454bf9cfb..4172a6bdbb 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndex;
 import 
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex;
 import 
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex;
 import 
org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex;
@@ -68,7 +69,7 @@ public class ForwardIndexType extends 
AbstractIndexType<ForwardIndexConfig, Forw
   public static final String INDEX_DISPLAY_NAME = "forward";
   // For multi-valued column, forward-index.
   // Maximum number of multi-values per row. We assert on this.
-  private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
+  public static final int MAX_MULTI_VALUES_PER_ROW = 1000;
   private static final int 
NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT = 100;
   private static final int 
NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT = 100_000;
   private static final List<String> EXTENSIONS = Lists.newArrayList(
@@ -283,6 +284,9 @@ public class ForwardIndexType extends 
AbstractIndexType<ForwardIndexConfig, Forw
           // Use a smaller capacity as opposed to segment flush size
           int initialCapacity = Math.min(context.getCapacity(),
               NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT);
+          if (config.getCompressionCodec() == CompressionCodec.CLP) {
+            return new CLPMutableForwardIndex(column, storedType, 
context.getMemoryManager(), context.getCapacity());
+          }
           return new VarByteSVMutableForwardIndex(storedType, 
context.getMemoryManager(), allocationContext,
               initialCapacity, 
NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT);
         }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV1.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV1.java
new file mode 100644
index 0000000000..d68a80ae20
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV1.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.MessageDecoder;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import org.apache.pinot.segment.local.io.util.VarLengthValueReader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+
+public class CLPForwardIndexReaderV1 implements 
ForwardIndexReader<CLPForwardIndexReaderV1.CLPReaderContext> {
+  private final int _version;
+  private final int _numDocs;
+  private final int _totalDictVarValues;
+  private final int _logTypeDictNumBytesPerValue;
+  private final int _dictVarsDictNumBytesPerValue;
+  private final int _logTypeDictReaderStartOffset;
+  private final VarLengthValueReader _logTypeDictReader;
+  private final int _dictVarsDictReaderStartOffset;
+  private final VarLengthValueReader _dictVarsDictReader;
+  private final int _logTypeFwdIndexReaderStartOffset;
+  private final FixedBitSVForwardIndexReader _logTypeFwdIndexReader;
+  private final int _dictVarsFwdIndexReaderStartOffset;
+  private final FixedBitMVForwardIndexReader _dictVarsFwdIndexReader;
+  private final int _encodedVarFwdIndexReaderStartOffset;
+  private final VarByteChunkForwardIndexReaderV4 _encodedVarFwdIndexReader;
+  private final MessageDecoder _clpMessageDecoder;
+
+  public CLPForwardIndexReaderV1(PinotDataBuffer pinotDataBuffer, int numDocs) 
{
+    _numDocs = numDocs;
+    int offset = CLPForwardIndexCreatorV1.MAGIC_BYTES.length;
+    _version = pinotDataBuffer.getInt(offset);
+    offset += 4;
+    _totalDictVarValues = pinotDataBuffer.getInt(offset);
+    offset += 4;
+    _logTypeDictNumBytesPerValue = pinotDataBuffer.getInt(offset);
+    offset += 4;
+    _dictVarsDictNumBytesPerValue = pinotDataBuffer.getInt(offset);
+    offset += 4;
+
+    int logTypeDictLength = pinotDataBuffer.getInt(offset);
+    offset += 4;
+    int dictVarDictLength = pinotDataBuffer.getInt(offset);
+    offset += 4;
+    int logTypeFwdIndexLength = pinotDataBuffer.getInt(offset);
+    offset += 4;
+    int dictVarsFwdIndexLength = pinotDataBuffer.getInt(offset);
+    offset += 4;
+    int encodedVarFwdIndexLength = pinotDataBuffer.getInt(offset);
+    offset += 4;
+
+    _logTypeDictReaderStartOffset = offset;
+    _logTypeDictReader = new VarLengthValueReader(pinotDataBuffer.view(offset, 
offset + logTypeDictLength));
+    offset += logTypeDictLength;
+
+    _dictVarsDictReaderStartOffset = offset;
+    _dictVarsDictReader = new 
VarLengthValueReader(pinotDataBuffer.view(offset, offset + dictVarDictLength));
+    offset += dictVarDictLength;
+
+    _logTypeFwdIndexReaderStartOffset = offset;
+    _logTypeFwdIndexReader =
+        new FixedBitSVForwardIndexReader(pinotDataBuffer.view(offset, offset + 
logTypeFwdIndexLength), _numDocs,
+            
PinotDataBitSet.getNumBitsPerValue(_logTypeDictReader.getNumValues() - 1));
+    offset += logTypeFwdIndexLength;
+
+    _dictVarsFwdIndexReaderStartOffset = offset;
+    _dictVarsFwdIndexReader =
+        new FixedBitMVForwardIndexReader(pinotDataBuffer.view(offset, offset + 
dictVarsFwdIndexLength), _numDocs,
+            _totalDictVarValues, 
PinotDataBitSet.getNumBitsPerValue(_dictVarsDictReader.getNumValues() - 1));
+    offset += dictVarsFwdIndexLength;
+
+    _encodedVarFwdIndexReaderStartOffset = offset;
+    _encodedVarFwdIndexReader =
+        new VarByteChunkForwardIndexReaderV4(pinotDataBuffer.view(offset, 
offset + encodedVarFwdIndexLength),
+            FieldSpec.DataType.LONG, false);
+    offset += encodedVarFwdIndexLength;
+
+    _clpMessageDecoder = new 
MessageDecoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Override
+  public FieldSpec.DataType getStoredType() {
+    return FieldSpec.DataType.STRING;
+  }
+
+  @Override
+  public String getString(int docId, CLPReaderContext context) {
+    int logTypeDictId = _logTypeFwdIndexReader.getDictId(docId, 
context._logTypeReaderContext);
+    String logType = _logTypeDictReader.getUnpaddedString(logTypeDictId, 
_logTypeDictNumBytesPerValue,
+        new byte[_logTypeDictNumBytesPerValue]);
+    int[] dictVarsDictIds = _dictVarsFwdIndexReader.getDictIdMV(docId, 
context._dictVarsReaderContext);
+
+    String[] dictVars = new String[dictVarsDictIds.length];
+    for (int i = 0; i < dictVarsDictIds.length; i++) {
+      dictVars[i] = _dictVarsDictReader.getUnpaddedString(dictVarsDictIds[i], 
_dictVarsDictNumBytesPerValue,
+          new byte[_dictVarsDictNumBytesPerValue]);
+    }
+    long[] encodedVar = _encodedVarFwdIndexReader.getLongMV(docId, 
context._encodedVarReaderContext);
+
+    try {
+      return _clpMessageDecoder.decodeMessage(logType, dictVars, encodedVar);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+
+  @Override
+  public CLPReaderContext createContext() {
+    return new CLPReaderContext(_dictVarsFwdIndexReader.createContext(), 
_logTypeFwdIndexReader.createContext(),
+        _encodedVarFwdIndexReader.createContext());
+  }
+
+  @Override
+  public boolean isBufferByteRangeInfoSupported() {
+    return true;
+  }
+
+  @Override
+  public void recordDocIdByteRanges(int docId, CLPReaderContext context, 
List<ByteRange> ranges) {
+    int logTypeDictId = _logTypeFwdIndexReader.getDictId(docId, 
context._logTypeReaderContext);
+    ranges.add(new ByteRange(_logTypeFwdIndexReaderStartOffset + 
_logTypeFwdIndexReader.getRawDataStartOffset()
+        + (long) _logTypeFwdIndexReader.getDocLength() * docId, 
_logTypeFwdIndexReader.getDocLength()));
+    _logTypeDictReader.recordOffsetRanges(logTypeDictId, 
_logTypeDictReaderStartOffset, ranges);
+
+    int[] dictVarsDictIds = _dictVarsFwdIndexReader.getDictIdMV(docId, 
context._dictVarsReaderContext);
+    List<ByteRange> fwdIndexByteRanges = new ArrayList<>();
+    _dictVarsFwdIndexReader.recordDocIdByteRanges(docId, 
context._dictVarsReaderContext, fwdIndexByteRanges);
+    for (ByteRange range : fwdIndexByteRanges) {
+      ranges.add(new ByteRange(_dictVarsFwdIndexReaderStartOffset + 
range.getOffset(), range.getSizeInBytes()));
+    }
+    fwdIndexByteRanges.clear();
+
+    for (int dictVarsDictId : dictVarsDictIds) {
+      _dictVarsDictReader.recordOffsetRanges(dictVarsDictId, 
_dictVarsDictReaderStartOffset, ranges);
+    }
+    _encodedVarFwdIndexReader.recordDocIdByteRanges(docId, 
context._encodedVarReaderContext, fwdIndexByteRanges);
+    for (ByteRange range : fwdIndexByteRanges) {
+      ranges.add(new ByteRange(_encodedVarFwdIndexReaderStartOffset + 
range.getOffset(), range.getSizeInBytes()));
+    }
+  }
+
+  @Override
+  public boolean isFixedOffsetMappingType() {
+    return false;
+  }
+
+  @Override
+  public ChunkCompressionType getCompressionType() {
+    return ChunkCompressionType.PASS_THROUGH;
+  }
+
+  public static final class CLPReaderContext implements 
ForwardIndexReaderContext {
+    private final FixedBitMVForwardIndexReader.Context _dictVarsReaderContext;
+    private final ForwardIndexReaderContext _logTypeReaderContext;
+    private final VarByteChunkForwardIndexReaderV4.ReaderContext 
_encodedVarReaderContext;
+
+    public CLPReaderContext(FixedBitMVForwardIndexReader.Context 
dictVarsReaderContext,
+        ForwardIndexReaderContext logTypeReaderContext,
+        VarByteChunkForwardIndexReaderV4.ReaderContext 
encodedVarReaderContext) {
+      _dictVarsReaderContext = dictVarsReaderContext;
+      _logTypeReaderContext = logTypeReaderContext;
+      _encodedVarReaderContext = encodedVarReaderContext;
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+      if (_dictVarsReaderContext != null) {
+        _dictVarsReaderContext.close();
+      }
+      if (_logTypeReaderContext != null) {
+        _logTypeReaderContext.close();
+      }
+      if (_encodedVarReaderContext != null) {
+        _encodedVarReaderContext.close();
+      }
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 43fd69488b..6c83ae9cb8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -1192,8 +1192,14 @@ public final class TableConfigUtils {
       CompressionCodec compressionCodec = fieldConfig.getCompressionCodec();
       switch (encodingType) {
         case RAW:
-          Preconditions.checkArgument(compressionCodec == null || 
compressionCodec.isApplicableToRawIndex(),
-              "Compression codec: %s is not applicable to raw index", 
compressionCodec);
+          Preconditions.checkArgument(compressionCodec == null || 
compressionCodec.isApplicableToRawIndex()
+                  || compressionCodec == CompressionCodec.CLP, "Compression 
codec: %s is not applicable to raw index",
+              compressionCodec);
+          if (compressionCodec == CompressionCodec.CLP && schema != null) {
+            Preconditions.checkArgument(
+                
schema.getFieldSpecFor(columnName).getDataType().getStoredType() == 
DataType.STRING,
+                "CLP compression codec can only be applied to string columns");
+          }
           break;
         case DICTIONARY:
           Preconditions.checkArgument(compressionCodec == null || 
compressionCodec.isApplicableToDictEncodedIndex(),
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java
index 69bc85551c..a678be4269 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java
@@ -20,9 +20,12 @@ package org.apache.pinot.segment.local.segment.creator;
 
 import java.io.File;
 import java.math.BigDecimal;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -54,6 +57,7 @@ import 
org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -149,29 +153,29 @@ public class DictionariesTest {
           Assert.assertTrue(heapDictionary instanceof IntDictionary);
           Assert.assertTrue(mmapDictionary instanceof IntDictionary);
           int firstInt = heapDictionary.getIntValue(0);
-          Assert.assertEquals(heapDictionary.indexOf(firstInt), 
heapDictionary.indexOf("" + firstInt));
-          Assert.assertEquals(mmapDictionary.indexOf(firstInt), 
mmapDictionary.indexOf("" + firstInt));
+          Assert.assertEquals(heapDictionary.indexOf(firstInt), 
heapDictionary.indexOf(String.valueOf(firstInt)));
+          Assert.assertEquals(mmapDictionary.indexOf(firstInt), 
mmapDictionary.indexOf(String.valueOf(firstInt)));
           break;
         case LONG:
           Assert.assertTrue(heapDictionary instanceof LongDictionary);
           Assert.assertTrue(mmapDictionary instanceof LongDictionary);
           long firstLong = heapDictionary.getLongValue(0);
-          Assert.assertEquals(heapDictionary.indexOf(firstLong), 
heapDictionary.indexOf("" + firstLong));
-          Assert.assertEquals(mmapDictionary.indexOf(firstLong), 
mmapDictionary.indexOf("" + firstLong));
+          Assert.assertEquals(heapDictionary.indexOf(firstLong), 
heapDictionary.indexOf(String.valueOf(firstLong)));
+          Assert.assertEquals(mmapDictionary.indexOf(firstLong), 
mmapDictionary.indexOf(String.valueOf(firstLong)));
           break;
         case FLOAT:
           Assert.assertTrue(heapDictionary instanceof FloatDictionary);
           Assert.assertTrue(mmapDictionary instanceof FloatDictionary);
           float firstFloat = heapDictionary.getFloatValue(0);
-          Assert.assertEquals(heapDictionary.indexOf(firstFloat), 
heapDictionary.indexOf("" + firstFloat));
-          Assert.assertEquals(mmapDictionary.indexOf(firstFloat), 
mmapDictionary.indexOf("" + firstFloat));
+          Assert.assertEquals(heapDictionary.indexOf(firstFloat), 
heapDictionary.indexOf(String.valueOf(firstFloat)));
+          Assert.assertEquals(mmapDictionary.indexOf(firstFloat), 
mmapDictionary.indexOf(String.valueOf(firstFloat)));
           break;
         case DOUBLE:
           Assert.assertTrue(heapDictionary instanceof DoubleDictionary);
           Assert.assertTrue(mmapDictionary instanceof DoubleDictionary);
           double firstDouble = heapDictionary.getDoubleValue(0);
-          Assert.assertEquals(heapDictionary.indexOf(firstDouble), 
heapDictionary.indexOf("" + firstDouble));
-          Assert.assertEquals(mmapDictionary.indexOf(firstDouble), 
mmapDictionary.indexOf("" + firstDouble));
+          Assert.assertEquals(heapDictionary.indexOf(firstDouble), 
heapDictionary.indexOf(String.valueOf(firstDouble)));
+          Assert.assertEquals(mmapDictionary.indexOf(firstDouble), 
mmapDictionary.indexOf(String.valueOf(firstDouble)));
           break;
         case BIG_DECIMAL:
           Assert.assertTrue(heapDictionary instanceof BigDecimalDictionary);
@@ -530,4 +534,35 @@ public class DictionariesTest {
         throw new IllegalArgumentException("Illegal data type for stats 
builder: " + dataType);
     }
   }
+
+  @Test
+  public void clpStatsCollectorTest() {
+    Schema schema = new Schema();
+    schema.addField(new DimensionFieldSpec("column1", DataType.STRING, true));
+    List<FieldConfig> fieldConfigList = new ArrayList<>();
+    fieldConfigList.add(new FieldConfig("column1", 
FieldConfig.EncodingType.RAW, Collections.EMPTY_LIST,
+        FieldConfig.CompressionCodec.CLP, Collections.EMPTY_MAP));
+    _tableConfig.setFieldConfigList(fieldConfigList);
+    StatsCollectorConfig statsCollectorConfig = new 
StatsCollectorConfig(_tableConfig, schema, null);
+    StringColumnPreIndexStatsCollector statsCollector =
+        new StringColumnPreIndexStatsCollector("column1", 
statsCollectorConfig);
+
+    List<String> logLines = new ArrayList<>();
+    logLines.add(
+        "2023/10/26 00:03:10.168 INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
+            + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property 
LiveInstance took 5 ms. Selective: true");
+    logLines.add(
+        "2023/10/26 00:03:10.169 INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
+            + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property 
LiveInstance took 4 ms. Selective: true");
+
+    for (String logLine : logLines) {
+      statsCollector.collect(logLine);
+    }
+    statsCollector.seal();
+
+    Assert.assertNotNull(statsCollector.getCLPStats());
+
+    // Same log line format
+    
Assert.assertEquals(statsCollector.getCLPStats().getSortedLogTypeValues().length,
 1);
+  }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java
new file mode 100644
index 0000000000..c97a1d318f
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV1;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableCustomConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class CLPForwardIndexCreatorTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"CLPForwardIndexCreatorTest");
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
+  }
+
+  @Test
+  public void testCLPWriter()
+      throws Exception {
+    List<String> logLines = new ArrayList<>();
+    logLines.add(
+        "2023/10/26 00:03:10.168 INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
+            + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property 
LiveInstance took 5 ms. Selective: true");
+    logLines.add(
+        "2023/10/26 00:03:10.169 INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
+            + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property 
LiveInstance took 4 ms. Selective: true");
+    logLines.add(
+        "2023/10/27 16:35:10.470 INFO [ControllerResponseFilter] 
[grizzly-http-server-2] Handled request from 0.0"
+            + ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness, 
content-type null status code 200 OK");
+    logLines.add(
+        "2023/10/27 16:35:10.607 INFO [ControllerResponseFilter] 
[grizzly-http-server-6] Handled request from 0.0"
+            + ".0.0 GET 
https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables, 
content-type "
+            + "application/json status code 200 OK");
+    logLines.add("null");
+
+    Schema schema = new Schema();
+    schema.addField(new DimensionFieldSpec("column1", 
FieldSpec.DataType.STRING, true));
+    TableConfig tableConfig =
+        new TableConfig("mytable", TableType.REALTIME.name(), new 
SegmentsValidationAndRetentionConfig(),
+            new TenantConfig(null, null, null), new IndexingConfig(), new 
TableCustomConfig(null), null, null, null,
+            null, null, null, null, null, null, null, null, false, null, null, 
null);
+    List<FieldConfig> fieldConfigList = new ArrayList<>();
+    fieldConfigList.add(new FieldConfig("column1", 
FieldConfig.EncodingType.RAW, Collections.EMPTY_LIST,
+        FieldConfig.CompressionCodec.CLP, Collections.EMPTY_MAP));
+    tableConfig.setFieldConfigList(fieldConfigList);
+    StatsCollectorConfig statsCollectorConfig = new 
StatsCollectorConfig(tableConfig, schema, null);
+    StringColumnPreIndexStatsCollector statsCollector =
+        new StringColumnPreIndexStatsCollector("column1", 
statsCollectorConfig);
+    for (String logLine : logLines) {
+      statsCollector.collect(logLine);
+    }
+    statsCollector.seal();
+
+    File indexFile = new File(TEMP_DIR, "column1" + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+    CLPForwardIndexCreatorV1 clpForwardIndexCreatorV1 =
+        new CLPForwardIndexCreatorV1(TEMP_DIR, "column1", logLines.size(), 
statsCollector);
+
+    for (String logLine : logLines) {
+      clpForwardIndexCreatorV1.putString(logLine);
+    }
+    clpForwardIndexCreatorV1.seal();
+    clpForwardIndexCreatorV1.close();
+
+    PinotDataBuffer pinotDataBuffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(indexFile);
+    CLPForwardIndexReaderV1 clpForwardIndexReaderV1 = new 
CLPForwardIndexReaderV1(pinotDataBuffer, logLines.size());
+    for (int i = 0; i < logLines.size(); i++) {
+      Assert.assertEquals(clpForwardIndexReaderV1.getString(i, 
clpForwardIndexReaderV1.createContext()),
+          logLines.get(i));
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    FileUtils.deleteDirectory(TEMP_DIR);
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexTest.java
new file mode 100644
index 0000000000..a926b9a396
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.forward.mutable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
+import 
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndex;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class CLPMutableForwardIndexTest {
+  private PinotDataBufferMemoryManager _memoryManager;
+
+  @BeforeClass
+  public void setUp() {
+    _memoryManager = new 
DirectMemoryManager(VarByteSVMutableForwardIndexTest.class.getName());
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    _memoryManager.close();
+  }
+
+  @Test
+  public void testString()
+      throws IOException {
+    // use arbitrary cardinality and avg string length
+    // we will test with complete randomness
+    int initialCapacity = 5;
+    int estimatedAvgStringLength = 30;
+    try (CLPMutableForwardIndex readerWriter = new 
CLPMutableForwardIndex("col1", FieldSpec.DataType.STRING,
+        _memoryManager, initialCapacity)) {
+      int rows = 3;
+      List<String> logLines = new ArrayList<>();
+      logLines.add(
+          "2023/10/26 00:03:10.168 INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
+              + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 
property LiveInstance took 5 ms. Selective:"
+              + " true");
+      logLines.add(
+          "2023/10/26 00:03:10.169 INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
+              + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 
property LiveInstance took 4 ms. Selective:"
+              + " true");
+      logLines.add(
+          "2023/10/27 16:35:10.470 INFO [ControllerResponseFilter] 
[grizzly-http-server-2] Handled request from 0.0"
+              + ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness, 
content-type null status code 200 OK");
+      logLines.add(
+          "2023/10/27 16:35:10.607 INFO [ControllerResponseFilter] 
[grizzly-http-server-6] Handled request from 0.0"
+              + ".0.0 GET 
https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables, 
content-type "
+              + "application/json status code 200 OK");
+      logLines.add("null");
+
+      for (int i = 0; i < rows; i++) {
+        readerWriter.setString(i, logLines.get(i));
+      }
+
+      for (int i = 0; i < rows; i++) {
+        Assert.assertEquals(readerWriter.getString(i), logLines.get(i));
+      }
+
+      // Verify clp stats
+      StringColumnPreIndexStatsCollector.CLPStatsCollector statsCollector =
+          new StringColumnPreIndexStatsCollector.CLPStatsCollector();
+      for (int i = 0; i < rows; i++) {
+        statsCollector.collect(logLines.get(i));
+      }
+      statsCollector.seal();
+      CLPStatsProvider.CLPStats stats = statsCollector.getCLPStats();
+
+      CLPStatsProvider.CLPStats mutableIndexStats = readerWriter.getCLPStats();
+      Assert.assertEquals(stats.getTotalNumberOfDictVars(), 
mutableIndexStats.getTotalNumberOfDictVars());
+      Assert.assertEquals(stats.getMaxNumberOfEncodedVars(), 
mutableIndexStats.getMaxNumberOfEncodedVars());
+      Assert.assertEquals(stats.getSortedDictVarValues(), 
mutableIndexStats.getSortedDictVarValues());
+      Assert.assertEquals(stats.getTotalNumberOfEncodedVars(), 
mutableIndexStats.getTotalNumberOfEncodedVars());
+      Assert.assertEquals(stats.getSortedLogTypeValues(), 
mutableIndexStats.getSortedLogTypeValues());
+      Assert.assertEquals(stats.getSortedDictVarValues(), 
mutableIndexStats.getSortedDictVarValues());
+    }
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
index af0232d661..9e47502098 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
@@ -43,6 +43,10 @@ public class ColumnIndexCreationInfo implements Serializable 
{
     _defaultNullValue = defaultNullValue;
   }
 
+  public ColumnStatistics getColumnStatistics() {
+    return _columnStatistics;
+  }
+
   public boolean isCreateDictionary() {
     return _createDictionary;
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
index 5f4dd2cd66..52df382efa 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
@@ -22,12 +22,14 @@ import java.io.File;
 import java.util.Objects;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 
 
 /**
- * Provides parameters for constructing indexes via {@link 
IndexType#createIndexCreator(IndexCreationContext, Object)}.
+ * Provides parameters for constructing indexes via
+ * {@link IndexType#createIndexCreator(IndexCreationContext, IndexConfig)}.
  * The responsibility for ensuring that the correct parameters for a particular
  * index type lies with the caller.
  */
@@ -88,7 +90,10 @@ public interface IndexCreationContext {
    */
   boolean isTextCommitOnClose();
 
+  ColumnStatistics getColumnStatistics();
+
   final class Builder {
+    private ColumnStatistics _columnStatistics;
     private File _indexDir;
     private int _lengthOfLongestEntry;
     private int _maxNumberOfMultiValueElements;
@@ -116,11 +121,17 @@ public interface IndexCreationContext {
           .withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax())
           
.withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries())
           
.withSortedUniqueElementsArray(columnIndexCreationInfo.getSortedUniqueElementsArray())
+          .withColumnStatistics(columnIndexCreationInfo.getColumnStatistics())
           .withCardinality(columnIndexCreationInfo.getDistinctValueCount())
           .withFixedLength(columnIndexCreationInfo.isFixedLength())
           .sorted(columnIndexCreationInfo.isSorted());
     }
 
+    public Builder withColumnStatistics(ColumnStatistics columnStatistics) {
+      _columnStatistics = columnStatistics;
+      return this;
+    }
+
     public Builder withIndexDir(File indexDir) {
       _indexDir = indexDir;
       return this;
@@ -222,7 +233,7 @@ public interface IndexCreationContext {
       return new Common(Objects.requireNonNull(_indexDir), 
_lengthOfLongestEntry, _maxNumberOfMultiValueElements,
           _maxRowLengthInBytes, _onHeap, Objects.requireNonNull(_fieldSpec), 
_sorted, _cardinality,
           _totalNumberOfEntries, _totalDocs, _hasDictionary, _minValue, 
_maxValue, _forwardIndexDisabled,
-          _sortedUniqueElementsArray, _optimizedDictionary, _fixedLength, 
_textCommitOnClose);
+          _sortedUniqueElementsArray, _optimizedDictionary, _fixedLength, 
_textCommitOnClose, _columnStatistics);
     }
 
     public Builder withSortedUniqueElementsArray(Object 
sortedUniqueElementsArray) {
@@ -255,13 +266,14 @@ public interface IndexCreationContext {
     private final boolean _optimizeDictionary;
     private final boolean _fixedLength;
     private final boolean _textCommitOnClose;
+    private final ColumnStatistics _columnStatistics;
 
     public Common(File indexDir, int lengthOfLongestEntry,
         int maxNumberOfMultiValueElements, int maxRowLengthInBytes, boolean 
onHeap,
         FieldSpec fieldSpec, boolean sorted, int cardinality, int 
totalNumberOfEntries,
         int totalDocs, boolean hasDictionary, Comparable<?> minValue, 
Comparable<?> maxValue,
         boolean forwardIndexDisabled, Object sortedUniqueElementsArray, 
boolean optimizeDictionary,
-        boolean fixedLength, boolean textCommitOnClose) {
+        boolean fixedLength, boolean textCommitOnClose, ColumnStatistics 
columnStatistics) {
       _indexDir = indexDir;
       _lengthOfLongestEntry = lengthOfLongestEntry;
       _maxNumberOfMultiValueElements = maxNumberOfMultiValueElements;
@@ -280,6 +292,7 @@ public interface IndexCreationContext {
       _optimizeDictionary = optimizeDictionary;
       _fixedLength = fixedLength;
       _textCommitOnClose = textCommitOnClose;
+      _columnStatistics = columnStatistics;
     }
 
     public FieldSpec getFieldSpec() {
@@ -360,5 +373,10 @@ public interface IndexCreationContext {
     public boolean isTextCommitOnClose() {
       return _textCommitOnClose;
     }
+
+    @Override
+    public ColumnStatistics getColumnStatistics() {
+      return _columnStatistics;
+    }
   }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index 52abf41ea1..6305dcd852 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -274,7 +274,8 @@ public class SegmentGeneratorConfig implements Serializable 
{
     if (fieldConfigList != null) {
       for (FieldConfig fieldConfig : fieldConfigList) {
         if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW
-            && fieldConfig.getCompressionCodec() != null) {
+            && fieldConfig.getCompressionCodec() != null
+            && fieldConfig.getCompressionCodec().isApplicableToRawIndex()) {
           _rawIndexCreationColumns.add(fieldConfig.getName());
           _rawIndexCompressionType.put(fieldConfig.getName(),
               
ChunkCompressionType.valueOf(fieldConfig.getCompressionCodec().name()));
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/StatsCollectorConfig.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/StatsCollectorConfig.java
index f868eea15a..fdd8abfcd7 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/StatsCollectorConfig.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/StatsCollectorConfig.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.segment.spi.creator;
 
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -36,6 +38,7 @@ public class StatsCollectorConfig {
   private final TableConfig _tableConfig;
   private final Schema _schema;
   private final SegmentPartitionConfig _segmentPartitionConfig;
+  private final Map<String, FieldConfig> _columnFieldConfigMap;
 
   /**
    * Constructor for the class.
@@ -49,6 +52,12 @@ public class StatsCollectorConfig {
     _tableConfig = tableConfig;
     _schema = schema;
     _segmentPartitionConfig = segmentPartitionConfig;
+    _columnFieldConfigMap = new HashMap<>();
+    if (tableConfig.getFieldConfigList() != null) {
+      for (FieldConfig fieldConfig : tableConfig.getFieldConfigList()) {
+        _columnFieldConfigMap.put(fieldConfig.getName(), fieldConfig);
+      }
+    }
   }
 
   @Nullable
@@ -89,4 +98,9 @@ public class StatsCollectorConfig {
   public TableConfig getTableConfig() {
     return _tableConfig;
   }
+
+  @Nullable
+  public FieldConfig getFieldConfigForColumn(String column) {
+    return _columnFieldConfigMap.get(column);
+  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
index 5db086a13b..132705036b 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
@@ -58,6 +58,7 @@ public class ForwardIndexConfig extends IndexConfig {
     if (compressionCodec != null) {
       switch (compressionCodec) {
         case PASS_THROUGH:
+        case CLP:
           _chunkCompressionType = ChunkCompressionType.PASS_THROUGH;
           _dictIdCompressionType = null;
           break;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
index 16b50ecbde..704cb2e01c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
@@ -126,6 +126,9 @@ public class FieldConfig extends BaseJsonConfig {
     SNAPPY(true, false),
     ZSTANDARD(true, false),
     LZ4(true, false),
+    // CLP is a special type of compression codec that isn't generally 
applicable to all RAW columns and has a
+    // special handling for log lines (see {@link CLPForwardIndexCreatorV1})
+    CLP(false, false),
 
     // For MV dictionary encoded forward index, add a second level dictionary 
encoding for the multi-value entries
     MV_ENTRY_DICT(false, true);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to