This is an automated email from the ASF dual-hosted git repository.

lqc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8774d320c4 Add and use CLPMutableForwardIndexV2 by default to improve 
ingestion performance and efficiency (#14241)
8774d320c4 is described below

commit 8774d320c4f789772f3a512fd290f6182625c140
Author: Jack Luo <[email protected]>
AuthorDate: Wed Oct 23 06:00:21 2024 +0800

    Add and use CLPMutableForwardIndexV2 by default to improve ingestion 
performance and efficiency (#14241)
    
    * Add the initial implementation of CLPMutableForwardIndexV2
    
    * Upgraded clp-ffi.version from 0.4.6 to 0.4.7
    
    * Updated comments
    
    * Address code review concerns
    
    * Enable CLPMutableForwardIndexV2 to be compatible with 
CLPMutableForwardIndex during mutable->immutable segment conversion.
    
    * Change Pinot to use CLPMutableForwardIndexV2 by default
    
    * Fix integration issues related to dictionary encoding.
    
    * Fix left-over bugs introduced by class name changes.
    
    * Fixed small bug in CLPMutableForwardIndexV2#appendEncodedMessage related 
to Pinot null value handling.
    
    * Fix style
---
 .../stats/MutableNoDictionaryColStatistics.java    |   3 +
 .../impl/forward/CLPMutableForwardIndexV2.java     | 512 +++++++++++++++++++++
 .../segment/index/forward/ForwardIndexType.java    |   8 +-
 .../mutable/CLPMutableForwardIndexTest.java        | 100 ++--
 .../mutable/CLPMutableForwardIndexV2Test.java      | 162 +++++++
 5 files changed, 743 insertions(+), 42 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
index 5f2d98893b..476bb5e10b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.util.Map;
 import java.util.Set;
 import 
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndex;
+import 
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
 import org.apache.pinot.segment.spi.creator.ColumnStatistics;
 import org.apache.pinot.segment.spi.datasource.DataSource;
@@ -118,6 +119,8 @@ public class MutableNoDictionaryColStatistics implements 
ColumnStatistics, CLPSt
   public CLPStats getCLPStats() {
     if (_forwardIndex instanceof CLPMutableForwardIndex) {
       return ((CLPMutableForwardIndex) _forwardIndex).getCLPStats();
+    } else if (_forwardIndex instanceof CLPMutableForwardIndexV2) {
+      return ((CLPMutableForwardIndexV2) _forwardIndex).getCLPStats();
     }
     throw new IllegalStateException(
         "CLP stats not available for column: " + 
_dataSourceMetadata.getFieldSpec().getName());
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndexV2.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndexV2.java
new file mode 100644
index 0000000000..8cd940d2df
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndexV2.java
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.forward;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.FlattenedByteArray;
+import com.yscope.clp.compressorfrontend.FlattenedByteArrayFactory;
+import com.yscope.clp.compressorfrontend.MessageDecoder;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import javax.validation.constraints.NotNull;
+import 
org.apache.pinot.segment.local.realtime.impl.dictionary.BytesOffHeapMutableDictionary;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This mutable forward index implements a composite index for string-typed 
columns with dynamic encoding options:
+ * <ol>
+ *   <li>Pure CLP dictionary encoding when the dictionary cardinality is below 
a configurable threshold.</li>
+ *   <li>CLP dictionary encoding combined with a raw string forward index when 
the dictionary cardinality exceeds
+ *   the threshold.</li>
+ * </ol>
+ * <p>
+ * Initially, CLP encoding transforms a high-cardinality log message string 
into three data columns:
+ * <ul>
+ *   <li>Logtype (very low cardinality) - essentially an inferred format 
string of the log</li>
+ *   <li>Dictionary variables (medium cardinality) - variables with both 
alphabets and numbers</li>
+ *   <li>Encoded variables (high cardinality) - pure fixed point and floating 
point numbers</li>
+ * </ul>
+ * The logtype and dictionary variables are dictionary-encoded, while the 
encoded variables are stored as longs.
+ * Notably, both {@code encodedVarIds} and {@code encodedVars} are 
multi-valued, but they are stored using a
+ * flattened single-value mutable forward index, along with a separate forward 
index to capture the end offsets
+ * for each multi-valued document. This approach is necessary because the 
maximum number of values per document
+ * is unknown during ingestion, unlike the existing multi-value forward index, 
which requires this information
+ * upfront. During the conversion from mutable to immutable forward index, the 
two single-value mutable indices
+ * are merged into a single immutable multi-valued forward index, as the max 
length is known at conversion time.
+ * <p>
+ * During ingestion, if the cardinality of either the {@code logtypeId} or 
{@code dictVarID} exceeds a predefined
+ * threshold, the ingestion mode switches to a raw bytes forward index for 
subsequent documents. Maintaining very
+ * large dictionaries is inefficient in Pinot due to memory and I/O 
constraints (memory-mapped). Switching to a
+ * raw bytes forward index helps avoid these issues. During reads, if the 
requested {@code docId} is in the raw
+ * forward index, the raw bytes are returned. Otherwise, the log type, 
dictionary variables, and encoded variables
+ * are decoded using the CLP decoder to return the original log message's 
bytes.
+ *
+ * <p><b>Note on Write and Read Operations:</b> Writes are strictly 
sequential, while reads can be performed
+ * randomly. The supported append operations are:</p>
+ * <ul>
+ *   <li>{@code setString(int docId, String value)} - Encodes the log message 
using CLP and invokes
+ *   {@code appendEncodedMessage(@NotNull EncodedMessage 
clpEncodedMessage)}.</li>
+ *   <li>{@code appendEncodedMessage(@NotNull EncodedMessage 
clpEncodedMessage)}</li>
+ * </ul>
+ *
+ * <p><b>Limitations:</b> The current CLP mutable forward index does not 
achieve the same compression ratio as the
+ * original standalone CLP implementation, primarily due to design differences 
between Pinot and CLP. While Pinot
+ * is optimized for fast random access, CLP is designed for single-pass 
streaming compression and search. As a result,
+ * Pinot sacrifices some compression efficiency for these primary reasons:
+ * <ul>
+ *   <li>Pinot implementation uses block compression compared to CLP’s 
standalone streaming compression, which achieves
+ *   much better compression ratio at the cost of random access 
performance.</li>
+ *   <li>Pinot implementation uses uncompressed dictionaries for random 
lookups of log types and dictionary variables,
+ *   whereas CLP employs compressed dictionaries suited only for single-pass 
streaming queries.</li>
+ *   <li>Pinot stores additional offsets and length metadata for log types, 
dictionary variables, and encoded
+ *   variables:</li>
+ *   <ul>
+ *      <li>Streaming forward indices used by CLP's standalone implementation, 
do not need to store document start or
+ *      end markers because the boundary of one document naturally aligns with 
the next.</li>
+ *      <li>CLP avoids storing the number of {@code dictVars} and {@code 
encodedVars}, as this information is already
+ *      embedded in the log type and available during decoding. Pinot, 
however, needs to store this metadata,
+ *      which can sometimes take up more space than the data itself when 
compressed.</li>
+ *   </ul>
+ * </ul>
+ * Additionally, CLP standalone binaries can perform search without 
decompressing the data into plain text. The most
+ * common query type on log data is partial matching. Frequently, searches can 
be completed by scanning only the logtype
+ * dictionaries for partial matches. Searches can also be executed on {@code 
logtypeId} and {@code dictVarId} directly
+ * by first performing lookup on the dictionaries to get a subset of 
dictionary ids to filter, whereas in Pinot,
+ * each dictionary id must be first converted back to strings, followed by a 
brute-force search on the corresponding
+ * string value. For these reasons, direct searching on CLP columns in Pinot 
is not yet implemented but may be
+ * included in future updates.</p>
+ */
+public class CLPMutableForwardIndexV2 implements MutableForwardIndex {
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(CLPMutableForwardIndexV2.class);
+  public final String _columnName;
+
+  protected final EncodedMessage _clpEncodedMessage;
+  protected final MessageEncoder _clpMessageEncoder;
+  protected final MessageDecoder _clpMessageDecoder;
+
+  protected int _nextDocId = 0;
+  protected int _nextDictVarDocId = 0;
+  protected int _nextEncodedVarId = 0;
+  protected int _bytesRawFwdIndexDocIdStartOffset = Integer.MAX_VALUE;
+  protected boolean _isClpEncoded = true;
+  protected int _lengthOfLongestElement;
+  protected int _lengthOfShortestElement;
+
+  protected int _maxNumDictVarIdPerDoc = 0;
+  protected int _maxNumEncodedVarPerDoc = 0;
+
+  protected int _numDocsWithNoDictVar = 0;
+  protected int _numDocsWithNoEncodedVar = 0;
+
+  protected VarByteSVMutableForwardIndex _rawBytes;
+  protected BytesOffHeapMutableDictionary _logtypeDict;
+  protected FixedByteSVMutableForwardIndex _logtypeId;
+  protected BytesOffHeapMutableDictionary _dictVarDict;
+  protected FixedByteSVMutableForwardIndex _dictVarOffset;
+  protected FixedByteSVMutableForwardIndex _dictVarId;
+  protected FixedByteSVMutableForwardIndex _encodedVarOffset;
+  protected FixedByteSVMutableForwardIndex _encodedVar;
+
+  // Various forward index and dictionary configurations with default values
+  // TODO: Provide more optimized default values in the future
+  protected int _estimatedMaxDocCount = 4096;
+  protected int _rawMessageEstimatedAvgEncodedLength = 256;
+  protected int _estimatedLogtypeAvgEncodedLength = 256;
+  protected int _logtypeIdNumRowsPerChunk = _estimatedMaxDocCount;
+  protected int _logtypeDictEstimatedCardinality = _estimatedMaxDocCount / 16;
+  protected int _dictVarDictEstimatedCardinality = _estimatedMaxDocCount / 8;
+  protected int _logtypeDictMaxOverflowHashSize = 128;
+  protected int _dictVarEstimatedAverageLength = 64;
+  protected int _dictVarOffsetPerChunk = 4 * 1024;
+  protected int _dictVarIdPerChunk = 256 * 1024;
+  protected int _dictVarDictMaxOverflowHashSize = 256;
+  protected int _encodedVarOffsetPerChunk = 4 * 1024;
+  protected int _encodedVarPerChunk = 256 * 1024;
+
+  // Dynamic CLP dictionary encoding configs
+  protected int _minNumDocsBeforeCardinalityMonitoring = _estimatedMaxDocCount 
/ 16;
+  protected boolean _forceEnableClpEncoding = false;
+  protected int _inverseLogtypeCardinalityRatioStopThreshold = 10;
+  protected int _inverseDictVarCardinalityRatioStopThreshold = 10;
+
+  public CLPMutableForwardIndexV2(String columnName, 
PinotDataBufferMemoryManager memoryManager) {
+    _columnName = columnName;
+
+    // Initialize clp-ffi datastructures
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new 
MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+    _clpMessageDecoder = new 
MessageDecoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+
+    // Raw forward index stored as bytes
+    _rawBytes = new VarByteSVMutableForwardIndex(FieldSpec.DataType.BYTES, 
memoryManager, columnName + "_rawBytes.fwd",
+        _estimatedMaxDocCount, _rawMessageEstimatedAvgEncodedLength);
+
+    // LogtypeId + logtype dictionary
+    _logtypeId =
+        new FixedByteSVMutableForwardIndex(false, FieldSpec.DataType.INT, 
_logtypeIdNumRowsPerChunk, memoryManager,
+            columnName + "_logtypeId.fwd");
+    _logtypeDict = new 
BytesOffHeapMutableDictionary(_logtypeDictEstimatedCardinality, 
_logtypeDictMaxOverflowHashSize,
+        memoryManager, columnName + "_logtype.dict", 
_estimatedLogtypeAvgEncodedLength);
+
+    // DictVars
+    _dictVarOffset =
+        new FixedByteSVMutableForwardIndex(false, FieldSpec.DataType.INT, 
_dictVarOffsetPerChunk, memoryManager,
+            columnName + "_dictVarOffsets.fwd");
+    _dictVarId = new FixedByteSVMutableForwardIndex(false, 
FieldSpec.DataType.INT, _dictVarIdPerChunk, memoryManager,
+        columnName + "_dictVarIds.fwd");
+    _dictVarDict = new 
BytesOffHeapMutableDictionary(_dictVarDictEstimatedCardinality, 
_dictVarDictMaxOverflowHashSize,
+        memoryManager, columnName + "_dictVar.dict", 
_dictVarEstimatedAverageLength);
+
+    // EncodedVars
+    _encodedVarOffset =
+        new FixedByteSVMutableForwardIndex(false, FieldSpec.DataType.INT, 
_encodedVarOffsetPerChunk, memoryManager,
+            columnName + "_encodedVarOffsets.fwd");
+    _encodedVar = new FixedByteSVMutableForwardIndex(false, 
FieldSpec.DataType.LONG, _encodedVarPerChunk, memoryManager,
+        columnName + "_encodedVar.fwd");
+
+    // Setup offsets used to access flattened dictVarId and encoded var 
multi-value docs
+    _dictVarOffset.setInt(0, 0);
+    _encodedVarOffset.setInt(0, 0);
+  }
+
+  /**
+   * Sets a string value in the forward index.
+   * <p>
+   * This method appends the given string value to the forward index. The 
provided `docId` is ignored as this mutable
+   * forward index only supports sequential writes (append operations) rather 
than random access based on document IDs.
+   * Only reads can be random.
+   *
+   * @param docId The document ID (ignored in this implementation).
+   * @param value The string value to append to the forward index.
+   */
+  @Override
+  public void setString(int docId, String value) {
+    // docId is intentionally ignored because this forward index only supports 
sequential writes (append only)
+    try {
+      _clpMessageEncoder.encodeMessage(value, _clpEncodedMessage);
+      appendEncodedMessage(_clpEncodedMessage);
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Failed to encode message: " + value, 
e);
+    }
+  }
+
+  /**
+   * Appends an encoded message to the forward index.
+   * <p>
+   * This method processes the provided {@link EncodedMessage} by first 
flattening the dictionary variables to ensure
+   * efficient data access through the lower-level clp-ffi API. The method 
handles potential null values within the
+   * encoded message by replacing them with empty arrays, as Pinot does not 
accept null values.
+   *
+   * @param clpEncodedMessage The {@link EncodedMessage} to append.
+   * @throws IOException if an I/O error occurs during the appending process.
+   */
+  public void appendEncodedMessage(@NotNull EncodedMessage clpEncodedMessage)
+      throws IOException {
+    if (_isClpEncoded) {
+      _logtypeId.setInt(_nextDocId, 
_logtypeDict.index(clpEncodedMessage.getLogtype()));
+
+      FlattenedByteArray flattenedDictVars = 
clpEncodedMessage.getDictionaryVarsAsFlattenedByteArray();
+      if (null == flattenedDictVars || 0 == flattenedDictVars.size()) {
+        _numDocsWithNoDictVar++;
+      } else {
+        for (byte[] dictVar : flattenedDictVars) {
+          _dictVarId.setInt(_nextDictVarDocId++, _dictVarDict.index(dictVar));
+        }
+        _maxNumDictVarIdPerDoc = Math.max(_maxNumDictVarIdPerDoc, 
flattenedDictVars.size());
+      }
+      _dictVarOffset.setInt(_nextDocId, _nextDictVarDocId);
+
+      // EncodedVars column typically have fairly high cardinality, so we skip 
dictionary encoding entirely
+      long[] encodedVars = clpEncodedMessage.getEncodedVars();
+      if (null == encodedVars || 0 == encodedVars.length) {
+        _numDocsWithNoEncodedVar++;
+      } else {
+        for (long encodedVar : encodedVars) {
+          _encodedVar.setLong(_nextEncodedVarId++, encodedVar);
+        }
+        _maxNumEncodedVarPerDoc = Math.max(_maxNumEncodedVarPerDoc, 
encodedVars.length);
+      }
+      _encodedVarOffset.setInt(_nextDocId, _nextEncodedVarId);
+
+      // Turn off clp encoding when dictionary size is exceeded
+      if (_nextDocId > _minNumDocsBeforeCardinalityMonitoring) {
+        int inverseLogtypeCardinalityRatio = _nextDocId / 
_logtypeDict.length();
+        if (inverseLogtypeCardinalityRatio < 
_inverseLogtypeCardinalityRatioStopThreshold) {
+          _isClpEncoded = false;
+          _bytesRawFwdIndexDocIdStartOffset = _nextDocId + 1;
+        } else if (_dictVarDict.length() > 0) {
+          int inverseDictVarCardinalityRatio = _nextDictVarDocId / 
_dictVarDict.length();
+          if (inverseDictVarCardinalityRatio < 
_inverseDictVarCardinalityRatioStopThreshold) {
+            _isClpEncoded = false;
+            _bytesRawFwdIndexDocIdStartOffset = _nextDocId + 1;
+          }
+        }
+      }
+    } else {
+      _rawBytes.setBytes(_nextDocId - _bytesRawFwdIndexDocIdStartOffset, 
clpEncodedMessage.getMessage());
+    }
+    _nextDocId++;
+
+    // Update mutable index statistics for compatibility purposes only
+    _lengthOfLongestElement = Math.max(_lengthOfLongestElement, 
clpEncodedMessage.getMessage().length);
+    _lengthOfShortestElement = Math.min(_lengthOfShortestElement, 
clpEncodedMessage.getMessage().length);
+  }
+
+  public int getNumDoc() {
+    return _nextDocId;
+  }
+
+  public int getNumLogtype() {
+    return _isClpEncoded ? _nextDocId : 0;
+  }
+
+  public int getNumDictVar() {
+    return _isClpEncoded ? _nextDictVarDocId : 0;
+  }
+
+  public int getNumEncodedVar() {
+    return _isClpEncoded ? _nextEncodedVarId : 0;
+  }
+
+  /**
+   * Forces the use of CLP dictionary encoding, overriding any automatic 
encoding decisions.
+   *
+   * <p><b>Note:</b> This method is exclusive to {@code forceRawEncoding}; 
enabling CLP dictionary
+   * encoding will disable any forced raw encoding. Only one of these methods 
can be active at a time.</p>
+   */
+  public void forceClpEncoding() {
+    _forceEnableClpEncoding = true;
+  }
+
+  /**
+   * Forces the use of raw encoding, overriding any automatic encoding 
decisions.
+   *
+   * <p><b>Note:</b> This method is exclusive to {@code forceClpEncoding}; 
enabling raw encoding will
+   * disable clp encoding. Only one of these methods can be active at a 
time.</p>
+   */
+  public void forceRawEncoding() {
+    _isClpEncoded = false;
+    _bytesRawFwdIndexDocIdStartOffset = 0;
+  }
+
+  public String getColumnName() {
+    return _columnName;
+  }
+
+  @Override
+  public String getString(int docId) {
+    return new String(getRawBytes(docId), StandardCharsets.UTF_8);
+  }
+
+  public byte[] getRawBytes(int docId) {
+    if (docId < 0) {
+      throw new IllegalArgumentException("Invalid docId: " + docId);
+    }
+    if (docId < _bytesRawFwdIndexDocIdStartOffset) {
+      // Decode from clp column
+      byte[] logtype = _logtypeDict.get(_logtypeId.getInt(docId));
+
+      int dictVarIdBeginOffset = (0 == docId) ? 0 : 
_dictVarOffset.getInt(docId - 1);
+      int dictVarIdEndOffset = _dictVarOffset.getInt(docId);
+      byte[][] dictVars = new byte[dictVarIdEndOffset - 
dictVarIdBeginOffset][];
+      for (int i = 0; i < dictVars.length; i++) {
+        dictVars[i] = _dictVarDict.get(_dictVarId.getInt(dictVarIdBeginOffset 
+ i));
+      }
+      FlattenedByteArray flattenedDictVars = 
FlattenedByteArrayFactory.fromByteArrays(dictVars);
+
+      int encodedVarIdBeginOffset = (0 == docId) ? 0 : 
_encodedVarOffset.getInt(docId - 1);
+      int encodedVarIdEndOffset = _encodedVarOffset.getInt(docId);
+      long[] encodedVars = new long[encodedVarIdEndOffset - 
encodedVarIdBeginOffset];
+      for (int i = 0; i < encodedVars.length; i++) {
+        encodedVars[i] = _encodedVar.getLong(encodedVarIdBeginOffset + i);
+      }
+
+      try {
+        return _clpMessageDecoder.decodeMessageAsBytes(logtype, 
flattenedDictVars, encodedVars);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(
+            "Failed to encode message: " + new String(logtype, 
StandardCharsets.ISO_8859_1), e);
+      }
+    } else {
+      return _rawBytes.getBytes(docId - _bytesRawFwdIndexDocIdStartOffset);
+    }
+  }
+
+  public FixedByteSVMutableForwardIndex getLogtypeId() {
+    return _logtypeId;
+  }
+
+  public BytesOffHeapMutableDictionary getLogtypeDict() {
+    return _logtypeDict;
+  }
+
+  public FixedByteSVMutableForwardIndex getDictVarOffset() {
+    return _dictVarOffset;
+  }
+
+  public FixedByteSVMutableForwardIndex getDictVarId() {
+    return _dictVarId;
+  }
+
+  public BytesOffHeapMutableDictionary getDictVarDict() {
+    return _dictVarDict;
+  }
+
+  public FixedByteSVMutableForwardIndex getEncodedVarOffset() {
+    return _encodedVarOffset;
+  }
+
+  public FixedByteSVMutableForwardIndex getEncodedVar() {
+    return _encodedVar;
+  }
+
+  /**
+   * Returns whether the mutable forward index is currently using CLP encoding.
+   *
+   * <p>Note that this reflects the current state, and the use of CLP encoding 
may change dynamically as new documents
+   * are ingested. CLP encoding can be enabled or disabled based on factors 
such as cardinality
+   * thresholds or forced encoding settings.</p>
+   *
+   * @return {@code true} if the forward index is currently using CLP 
encoding; {@code false} otherwise.
+   */
+  public boolean isClpEncoded() {
+    return _isClpEncoded;
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public int getLengthOfLongestElement() {
+    return _lengthOfLongestElement;
+  }
+
+  @Override
+  public int getLengthOfShortestElement() {
+    return _lengthOfShortestElement;
+  }
+
+  public int getMaxNumDictVarIdPerDoc() {
+    return _maxNumDictVarIdPerDoc;
+  }
+
+  public int getMaxNumEncodedVarPerDoc() {
+    return _maxNumEncodedVarPerDoc;
+  }
+
+  /**
+   * Compatibility method for generating statistic objects to be used by 
CLPForwardIndexCreatorV1 only.
+   */
+  public CLPStatsProvider.CLPStats getCLPStats() {
+    if (!isClpEncoded()) {
+      throw new UnsupportedOperationException(
+          "CLP encoding is required for compatibility support. Please call the 
forceClpEncoding() "
+              + "method immediately after class initialization to ensure 
compatibility.");
+    }
+
+    // To generate a compatible stats object, we'll need to do some 
post-processing.
+    String[] sortedLogtypeDictValues = 
getSortedDictionaryValuesAsStrings(_logtypeDict, StandardCharsets.ISO_8859_1);
+    String[] sortedDictVarDictValues = 
getSortedDictionaryValuesAsStrings(_dictVarDict, StandardCharsets.UTF_8);
+    int totalNumberOfDictVars = _nextDictVarDocId;
+    int totalNumberOfEncodedVars = _nextEncodedVarId;
+    int maxNumberOfEncodedVars = _maxNumEncodedVarPerDoc;
+    return new CLPStatsProvider.CLPStats(sortedLogtypeDictValues, 
sortedDictVarDictValues, totalNumberOfDictVars,
+        totalNumberOfEncodedVars, maxNumberOfEncodedVars);
+  }
+
+  public String[] 
getSortedDictionaryValuesAsStrings(BytesOffHeapMutableDictionary dict, Charset 
charset) {
+    // Adapted from StringOffHeapMutableDictionary#getSortedValues()
+    int numValues = dict.length();
+    String[] sortedValues = new String[numValues];
+    for (int dictId = 0; dictId < numValues; dictId++) {
+      sortedValues[dictId] = new String(dict.getBytesValue(dictId), charset);
+    }
+
+    Arrays.sort(sortedValues);
+    return sortedValues;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Override
+  public FieldSpec.DataType getStoredType() {
+    return FieldSpec.DataType.STRING;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _rawBytes.close();
+    closeClpLogtypeIndex();
+  }
+
+  protected void closeClpLogtypeIndex()
+      throws IOException {
+    // LoogtypeId
+    if (_logtypeDict != null) {
+      _logtypeDict.close();
+      _logtypeDict = null;
+    }
+    if (_logtypeId != null) {
+      _logtypeId.close();
+      _logtypeId = null;
+    }
+
+    // DictVarsIds
+    if (_dictVarOffset != null) {
+      _dictVarOffset.close();
+      _dictVarOffset = null;
+    }
+    if (_dictVarDict != null) {
+      _dictVarDict.close();
+      _dictVarDict = null;
+    }
+    if (_dictVarId != null) {
+      _dictVarId.close();
+      _dictVarId = null;
+    }
+
+    // EncodedVars
+    if (_encodedVarOffset != null) {
+      _encodedVarOffset.close();
+      _encodedVarOffset = null;
+    }
+    if (_encodedVar != null) {
+      _encodedVar.close();
+      _encodedVar = null;
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
index 3846b983fb..74b0b49c8b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
@@ -26,7 +26,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
-import 
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndex;
+import 
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2;
 import 
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex;
 import 
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex;
 import 
org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex;
@@ -251,7 +251,11 @@ public class ForwardIndexType extends 
AbstractIndexType<ForwardIndexConfig, Forw
           int initialCapacity =
               Math.min(context.getCapacity(), 
NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT);
           if (config.getCompressionCodec() == CompressionCodec.CLP) {
-            return new CLPMutableForwardIndex(column, storedType, 
context.getMemoryManager(), context.getCapacity());
+            CLPMutableForwardIndexV2 clpMutableForwardIndex =
+                new CLPMutableForwardIndexV2(column, 
context.getMemoryManager());
+            // TODO: enable config to invoke forceClpDictionaryEncoding() 
on-demand
+            clpMutableForwardIndex.forceClpEncoding();
+            return clpMutableForwardIndex;
           }
           return new VarByteSVMutableForwardIndex(storedType, 
context.getMemoryManager(), allocationContext,
               initialCapacity, 
NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexTest.java
index a926b9a396..fde7d677ab 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexTest.java
@@ -23,8 +23,10 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
 import 
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndex;
+import 
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
 import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.testng.Assert;
@@ -34,6 +36,22 @@ import org.testng.annotations.Test;
 
 
 public class CLPMutableForwardIndexTest {
+  private final List<String> _logLines = new ArrayList<>() {{
+    add("2023/10/26 00:03:10.168 INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
+        + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property 
LiveInstance took 5 ms. Selective:"
+        + " true");
+    add("2023/10/26 00:03:10.169 INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
+        + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property 
LiveInstance took 4 ms. Selective:"
+        + " true");
+    add("2023/10/27 16:35:10.470 INFO [ControllerResponseFilter] 
[grizzly-http-server-2] Handled request from 0.0"
+        + ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness, 
content-type null status code 200 OK");
+    add("2023/10/27 16:35:10.607 INFO [ControllerResponseFilter] 
[grizzly-http-server-6] Handled request from 0.0"
+        + ".0.0 GET 
https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables, 
content-type "
+        + "application/json status code 200 OK");
+    add("null");
+  }};
+  private final int _rows = 3;
+
   private PinotDataBufferMemoryManager _memoryManager;
 
   @BeforeClass
@@ -47,58 +65,60 @@ public class CLPMutableForwardIndexTest {
     _memoryManager.close();
   }
 
+  /**
+   * Test using CLPMutableForwardIndex
+   */
   @Test
   public void testString()
       throws IOException {
     // use arbitrary cardinality and avg string length
     // we will test with complete randomness
     int initialCapacity = 5;
-    int estimatedAvgStringLength = 30;
     try (CLPMutableForwardIndex readerWriter = new 
CLPMutableForwardIndex("col1", FieldSpec.DataType.STRING,
         _memoryManager, initialCapacity)) {
-      int rows = 3;
-      List<String> logLines = new ArrayList<>();
-      logLines.add(
-          "2023/10/26 00:03:10.168 INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
-              + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 
property LiveInstance took 5 ms. Selective:"
-              + " true");
-      logLines.add(
-          "2023/10/26 00:03:10.169 INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
-              + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 
property LiveInstance took 4 ms. Selective:"
-              + " true");
-      logLines.add(
-          "2023/10/27 16:35:10.470 INFO [ControllerResponseFilter] 
[grizzly-http-server-2] Handled request from 0.0"
-              + ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness, 
content-type null status code 200 OK");
-      logLines.add(
-          "2023/10/27 16:35:10.607 INFO [ControllerResponseFilter] 
[grizzly-http-server-6] Handled request from 0.0"
-              + ".0.0 GET 
https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables, 
content-type "
-              + "application/json status code 200 OK");
-      logLines.add("null");
+      ingestData(readerWriter);
+      validateStats(readerWriter.getCLPStats());
+    }
+  }
 
-      for (int i = 0; i < rows; i++) {
-        readerWriter.setString(i, logLines.get(i));
-      }
+  /**
+   * Same as testString() except it is using CLPMutableForwardIndexV2
+   */
+  @Test
+  public void testStringV2()
+      throws IOException {
+    try (CLPMutableForwardIndexV2 readerWriter = new 
CLPMutableForwardIndexV2("col1", _memoryManager)) {
+      readerWriter.forceClpEncoding();
+      ingestData(readerWriter);
+      validateStats(readerWriter.getCLPStats());
+    }
+  }
 
-      for (int i = 0; i < rows; i++) {
-        Assert.assertEquals(readerWriter.getString(i), logLines.get(i));
-      }
+  private void ingestData(MutableForwardIndex readerWriter) {
+    for (int i = 0; i < _rows; i++) {
+      readerWriter.setString(i, _logLines.get(i));
+    }
 
-      // Verify clp stats
-      StringColumnPreIndexStatsCollector.CLPStatsCollector statsCollector =
-          new StringColumnPreIndexStatsCollector.CLPStatsCollector();
-      for (int i = 0; i < rows; i++) {
-        statsCollector.collect(logLines.get(i));
-      }
-      statsCollector.seal();
-      CLPStatsProvider.CLPStats stats = statsCollector.getCLPStats();
+    for (int i = 0; i < _rows; i++) {
+      Assert.assertEquals(readerWriter.getString(i), _logLines.get(i));
+    }
+  }
 
-      CLPStatsProvider.CLPStats mutableIndexStats = readerWriter.getCLPStats();
-      Assert.assertEquals(stats.getTotalNumberOfDictVars(), 
mutableIndexStats.getTotalNumberOfDictVars());
-      Assert.assertEquals(stats.getMaxNumberOfEncodedVars(), 
mutableIndexStats.getMaxNumberOfEncodedVars());
-      Assert.assertEquals(stats.getSortedDictVarValues(), 
mutableIndexStats.getSortedDictVarValues());
-      Assert.assertEquals(stats.getTotalNumberOfEncodedVars(), 
mutableIndexStats.getTotalNumberOfEncodedVars());
-      Assert.assertEquals(stats.getSortedLogTypeValues(), 
mutableIndexStats.getSortedLogTypeValues());
-      Assert.assertEquals(stats.getSortedDictVarValues(), 
mutableIndexStats.getSortedDictVarValues());
+  private void validateStats(CLPStatsProvider.CLPStats mutableIndexStats) {
+    // Verify clp stats
+    StringColumnPreIndexStatsCollector.CLPStatsCollector statsCollector =
+        new StringColumnPreIndexStatsCollector.CLPStatsCollector();
+    for (int i = 0; i < _rows; i++) {
+      statsCollector.collect(_logLines.get(i));
     }
+    statsCollector.seal();
+    CLPStatsProvider.CLPStats stats = statsCollector.getCLPStats();
+
+    Assert.assertEquals(stats.getTotalNumberOfDictVars(), 
mutableIndexStats.getTotalNumberOfDictVars());
+    Assert.assertEquals(stats.getMaxNumberOfEncodedVars(), 
mutableIndexStats.getMaxNumberOfEncodedVars());
+    Assert.assertEquals(stats.getSortedDictVarValues(), 
mutableIndexStats.getSortedDictVarValues());
+    Assert.assertEquals(stats.getTotalNumberOfEncodedVars(), 
mutableIndexStats.getTotalNumberOfEncodedVars());
+    Assert.assertEquals(stats.getSortedLogTypeValues(), 
mutableIndexStats.getSortedLogTypeValues());
+    Assert.assertEquals(stats.getSortedDictVarValues(), 
mutableIndexStats.getSortedDictVarValues());
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexV2Test.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexV2Test.java
new file mode 100644
index 0000000000..b1824570dc
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexV2Test.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.forward.mutable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
+import 
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2;
+import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class CLPMutableForwardIndexV2Test {
+  private PinotDataBufferMemoryManager _memoryManager;
+
+  @BeforeClass
+  public void setUp() {
+    _memoryManager = new 
DirectMemoryManager(VarByteSVMutableForwardIndexTest.class.getName());
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    _memoryManager.close();
+  }
+
+  /**
+   * Sanity check
+   */
+  @Test
+  public void testReadWriteOnLogMessages()
+      throws IOException {
+    try (CLPMutableForwardIndexV2 readerWriter = new 
CLPMutableForwardIndexV2("col1", _memoryManager)) {
+      List<String> logLines = new ArrayList<>();
+      for (int i = 0; i < 10000; i++) {
+        logLines.add("INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
+            + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property 
LiveInstance took 5 ms. Selective:"
+            + " true");
+        logLines.add("INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
+            + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property 
LiveInstance took 4 ms. Selective:"
+            + " true");
+        logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-2] 
Handled request from 0.0"
+            + ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness, 
content-type null status code 200 OK");
+        logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-6] 
Handled request from 0.0"
+            + ".0.0 GET 
https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables, 
content-type "
+            + "application/json status code 200 OK");
+        logLines.add("null");
+      }
+
+      // Typically, log messages should be clp encoded due to low logtype and 
dictionary variable cardinality
+      Assert.assertTrue(readerWriter.isClpEncoded());
+
+      // Write
+      for (int i = 0; i < logLines.size(); i++) {
+        readerWriter.setString(i, logLines.get(i));
+      }
+
+      // Read
+      for (int i = 0; i < logLines.size(); i++) {
+        Assert.assertEquals(readerWriter.getString(i), logLines.get(i));
+      }
+    }
+  }
+
+  @Test
+  public void testClpDictionaryCompression()
+      throws IOException {
+    try (CLPMutableForwardIndexV2 readerWriter = new 
CLPMutableForwardIndexV2("col1", _memoryManager)) {
+      // Write 400,000 logs
+      // Mutable index should containing 4 unique logtype, 5 unique dictionary 
variables and 200,000 encoded values
+      for (int i = 0; i < 4 * 100000; i += 4) {
+        readerWriter.setString(i, "static value, dictionaryVar" + i % 5 + ", 
encodedVar: " + i);
+        readerWriter.setString(i + 1, "static value, dictionaryVar" + i % 5);
+        readerWriter.setString(i + 2, "static value, encodedVar: " + i);
+        readerWriter.setString(i + 3, "static value");
+      }
+
+      // Mutable forward index should be clp encoded since cardinality is low
+      Assert.assertTrue(readerWriter.isClpEncoded());
+
+      // Mutable forward index should contain exactly 400,000 documents
+      Assert.assertEquals(readerWriter.getNumDoc(), 400000);
+
+      // Mutable forward index should contain exactly 4 unique logtype
+      Assert.assertEquals(readerWriter.getLogtypeDict().length(), 4);
+
+      // Mutable forward index should contain exactly 5 unique dictionary 
variables
+      Assert.assertEquals(readerWriter.getDictVarDict().length(), 5);
+
+      // Mutable forward index should contain exactly 400,000 encoded values
+      Assert.assertEquals(readerWriter.getNumEncodedVar(), 200000);
+    }
+  }
+
+  @Test
+  public void testRawEncodingDueToHighLogtypeCardinality()
+      throws IOException {
+    try (CLPMutableForwardIndexV2 readerWriter = new 
CLPMutableForwardIndexV2("col1", _memoryManager)) {
+      // Write 400,000 logs
+      // Mutable index should containing 400,000 unique logtype
+      for (int i = 0; i < 4 * 100000; i++) {
+        String log = generateRandomString(64);
+        readerWriter.setString(i, log);
+        Assert.assertEquals(readerWriter.getString(i), log);
+      }
+
+      // Mutable forward index should be clp encoded since cardinality is low
+      Assert.assertFalse(readerWriter.isClpEncoded());
+    }
+  }
+
+  @Test
+  public void testRawEncodingDueToHighDictVarCardinality()
+      throws IOException {
+    // Define the character set (A-Z and a-z)
+    try (CLPMutableForwardIndexV2 readerWriter = new 
CLPMutableForwardIndexV2("col1", _memoryManager)) {
+      // Write 400,000 logs
+      // Mutable index should containing 1 unique logtype, 400,000 unique 
dictVar values
+      for (int i = 0; i < 4 * 100000; i++) {
+        String log = "A log with " + generateRandomString(64) + "-" + i;
+        readerWriter.setString(i, log);
+        Assert.assertEquals(readerWriter.getString(i), log);
+      }
+
+      // Mutable forward index should be clp encoded since cardinality is low
+      Assert.assertFalse(readerWriter.isClpEncoded());
+    }
+  }
+
+  private static final String CHARACTERS = 
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+
+  private static String generateRandomString(int length) {
+    StringBuilder result = new StringBuilder(length);
+    for (int i = 0; i < length; i++) {
+      // Pick a random character from CHARACTERS string
+      int index = ThreadLocalRandom.current().nextInt(CHARACTERS.length());
+      result.append(CHARACTERS.charAt(index));
+    }
+    return result.toString();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to