xiangfu0 commented on code in PR #18092:
URL: https://github.com/apache/pinot/pull/18092#discussion_r3036490137


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -2174,22 +2175,45 @@ public static boolean isRelevantToTenant(TableConfig 
tableConfig, String tenantN
   }
 
   private static void validateGorillaCompressionCodecIfPresent(FieldConfig 
fieldConfig, FieldSpec fieldSpec) {
-    if (fieldConfig.getCompressionCodec() == null) {
-      return;
-    }
-    switch (fieldConfig.getCompressionCodec()) {
-      case DELTA:
-      case DELTADELTA:
+    // Validate legacy compressionCodec DELTA/DELTADELTA constraints
+    if (fieldConfig.getCompressionCodec() != null) {
+      switch (fieldConfig.getCompressionCodec()) {
+        case DELTA:
+        case DELTADELTA:
+          Preconditions.checkState(fieldSpec.isSingleValueField(),
+              "Compression codec %s can only be used on single-value columns, 
found multi-value column: %s",
+              fieldConfig.getCompressionCodec(), fieldConfig.getName());
+          DataType storedType = fieldSpec.getDataType().getStoredType();
+          Preconditions.checkState(storedType == DataType.INT || storedType == 
DataType.LONG,
+              "Compression codec %s can only be used on INT/LONG data types, 
found %s for column: %s",
+              fieldConfig.getCompressionCodec(), storedType, 
fieldConfig.getName());
+          break;
+        default:
+          // no-op for other codecs
+      }
+    }
+
+    // Validate codecPipeline transform constraints (DELTA, DOUBLE_DELTA 
require SV INT/LONG)
+    List<String> codecPipeline = fieldConfig.getCodecPipeline();
+    if (codecPipeline != null && !codecPipeline.isEmpty()) {
+      boolean hasTransform = codecPipeline.stream().anyMatch(name -> {
+        try {
+          ChunkCodec codec = ChunkCodec.valueOf(name.trim().toUpperCase());
+          return codec.isTransform();
+        } catch (IllegalArgumentException e) {
+          return false;
+        }
+      });

Review Comment:
   Good catch — fixed. The validation now constructs the full pipeline via 
`ChunkCodecPipeline.fromNames(codecPipeline)` upfront, which validates all 
stage names and rejects internal-only codecs. Invalid/misspelled names will now 
fail with a clear `IllegalArgumentException` at table validation time.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -380,23 +381,37 @@ private boolean shouldChangeRawCompressionType(String 
column, SegmentDirectory.R
     // The compression type for an existing segment can only be determined by 
reading the forward index header.
     ColumnMetadata existingColMetadata = 
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
     ChunkCompressionType existingCompressionType;
+    ChunkCodecPipeline existingPipeline;
 
     // Get the forward index reader factory and create a reader
     IndexReaderFactory<ForwardIndexReader> readerFactory = 
StandardIndexes.forward().getReaderFactory();
     try (ForwardIndexReader<?> fwdIndexReader = 
readerFactory.createIndexReader(segmentReader,
         _fieldIndexConfigs.get(column), existingColMetadata)) {
       existingCompressionType = fwdIndexReader.getCompressionType();
+      existingPipeline = fwdIndexReader.getCodecPipeline();
       Preconditions.checkState(existingCompressionType != null,
           "Existing compressionType cannot be null for raw forward index 
column=" + column);
     }
 
-    // Get the new compression type.
-    ChunkCompressionType newCompressionType =
-        
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()).getChunkCompressionType();
+    ForwardIndexConfig newConfig = 
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
+    ChunkCodecPipeline newPipeline = newConfig.getCodecPipeline();
+
+    // newPipeline is non-null for all non-CLP RAW codecs (auto-derived from 
compressionCodec).
+    // Compare via pipeline when possible; fall back to compressionType for 
CLP codecs.
+    if (newPipeline != null) {
+      // For legacy segments (pre-V7), derive pipeline from their 
compressionType for comparison.
+      // e.g., existing DELTA segment → pipeline [DELTA_LZ4]; existing LZ4 
segment → pipeline [LZ4].
+      if (existingPipeline == null) {
+        existingPipeline = 
ChunkCodecPipeline.fromCompressionType(existingCompressionType);
+      }
+      return !existingPipeline.equals(newPipeline);
+    }
 
+    // Neither has a pipeline (CLP codecs) — compare compression types 
directly.
     // Note that default compression type (PASS_THROUGH for metric and LZ4 for 
dimension) is not considered if the
     // compressionType is not explicitly provided in tableConfig. This is to 
avoid incorrectly rewriting all the
     // forward indexes during segmentReload when the default compressionType 
changes.
+    ChunkCompressionType newCompressionType = 
newConfig.getChunkCompressionType();
     return newCompressionType != null && existingCompressionType != 
newCompressionType;

Review Comment:
   Acknowledged — the CLP variant detection limitation exists in the 
pre-existing code (this PR doesn't change the CLP comparison logic, only adds 
the pipeline comparison path above it). CLP codecs have their own dedicated 
creator/reader paths and don't participate in the pipeline abstraction. 
Improving CLP variant change detection is a separate concern that should be 
addressed in a follow-up if needed.



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/codec/ChunkCodecPipeline.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.compression;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+
+/**
+ * An immutable, ordered sequence of {@link ChunkCodec} stages that defines 
how chunk data is
+ * encoded before being written to disk and decoded after being read back.
+ *
+ * <p>Invariants enforced at construction time:
+ * <ul>
+ *   <li>The pipeline is non-empty.</li>
+ *   <li>At most one {@link ChunkCodec.CodecKind#COMPRESSOR COMPRESSOR} stage 
is present,
+ *       and it must be the <b>last</b> stage.</li>
+ *   <li>All preceding stages (if any) must be {@link 
ChunkCodec.CodecKind#TRANSFORM TRANSFORM}.</li>
+ *   <li>Pipeline length is at most {@value #MAX_PIPELINE_LENGTH}.</li>
+ * </ul>
+ *
+ * <p>On write the stages are applied left-to-right (transforms first, then 
compression).
+ * On read the stages are applied right-to-left (decompress first, then 
reverse transforms).</p>
+ */
+public final class ChunkCodecPipeline {
+
+  /** Maximum number of codec stages in a single pipeline. */
+  public static final int MAX_PIPELINE_LENGTH = 8;
+
+  /** Default pipeline: a single LZ4 compressor, matching Pinot's historic 
default. */
+  public static final ChunkCodecPipeline DEFAULT = new ChunkCodecPipeline(
+      Collections.singletonList(ChunkCodec.LZ4));
+
+  private final List<ChunkCodec> _stages;
+
+  /**
+   * Creates a pipeline from an ordered list of codec stages.
+   *
+   * @throws IllegalArgumentException if invariants are violated
+   */
+  public ChunkCodecPipeline(List<ChunkCodec> stages) {
+    Preconditions.checkArgument(stages != null && !stages.isEmpty(), "Pipeline 
must have at least one stage");
+    Preconditions.checkArgument(stages.size() <= MAX_PIPELINE_LENGTH,
+        "Pipeline length %s exceeds maximum of %s", stages.size(), 
MAX_PIPELINE_LENGTH);
+
+    // Validate: all transforms first, at most one compressor at the end
+    boolean seenCompressor = false;
+    for (int i = 0; i < stages.size(); i++) {
+      ChunkCodec codec = stages.get(i);
+      Preconditions.checkArgument(codec != null, "Pipeline stage at index %s 
is null", i);
+      if (codec.isCompressor()) {
+        Preconditions.checkArgument(!seenCompressor, "Pipeline contains more 
than one compressor: %s", stages);
+        Preconditions.checkArgument(i == stages.size() - 1,
+            "Compressor %s must be the last stage in the pipeline, but found 
at index %s of %s",
+            codec, i, stages.size());
+        seenCompressor = true;
+      }
+    }
+
+    _stages = Collections.unmodifiableList(new ArrayList<>(stages));
+  }
+
+  /** Returns the ordered, immutable list of codec stages. */
+  public List<ChunkCodec> getStages() {
+    return _stages;
+  }
+
+  /** Returns the number of stages in the pipeline. */
+  public int size() {
+    return _stages.size();
+  }
+
+  /** Returns the codec at the given index. */
+  public ChunkCodec get(int index) {
+    return _stages.get(index);
+  }
+
+  /**
+   * Returns the terminal compressor, or {@link ChunkCodec#PASS_THROUGH} if 
the pipeline
+   * contains only transforms.
+   */
+  public ChunkCodec getCompressor() {
+    ChunkCodec last = _stages.get(_stages.size() - 1);
+    return last.isCompressor() ? last : ChunkCodec.PASS_THROUGH;
+  }
+
+  /** Returns the transform stages (all stages except the terminal 
compressor). */
+  public List<ChunkCodec> getTransforms() {
+    ChunkCodec last = _stages.get(_stages.size() - 1);
+    if (last.isCompressor()) {
+      return _stages.subList(0, _stages.size() - 1);
+    }
+    return _stages;
+  }
+
+  /** Returns {@code true} if the pipeline contains any transform stages. */
+  public boolean hasTransforms() {
+    return !getTransforms().isEmpty();
+  }
+
+  /**
+   * Maps the terminal compressor back to a {@link ChunkCompressionType} for 
backward
+   * compatibility with readers/writers that still use the legacy enum.
+   */
+  public ChunkCompressionType getChunkCompressionType() {
+    ChunkCodec compressor = getCompressor();
+    switch (compressor) {
+      case PASS_THROUGH:
+        return ChunkCompressionType.PASS_THROUGH;
+      case SNAPPY:
+        return ChunkCompressionType.SNAPPY;
+      case ZSTANDARD:
+        return ChunkCompressionType.ZSTANDARD;
+      case LZ4:
+        return ChunkCompressionType.LZ4;
+      case GZIP:
+        return ChunkCompressionType.GZIP;
+      case DELTA_LZ4:
+        return ChunkCompressionType.DELTA;
+      case DOUBLE_DELTA_LZ4:
+        return ChunkCompressionType.DELTADELTA;
+      default:
+        throw new IllegalStateException("No ChunkCompressionType mapping for: 
" + compressor);
+    }
+  }
+
+  /**
+   * Creates a pipeline from codec names (e.g., {@code ["DELTA", 
"ZSTANDARD"]}).
+   * Internal-only codecs (see {@link ChunkCodec#isInternalOnly()}) are 
rejected.
+   *
+   * @throws IllegalArgumentException if names contain internal-only codecs
+   */
+  public static ChunkCodecPipeline fromNames(List<String> names) {
+    Preconditions.checkArgument(names != null && !names.isEmpty(), "Pipeline 
names must be non-empty");
+    List<ChunkCodec> stages = names.stream()
+        .map(name -> {
+          String normalized = name.trim().toUpperCase();
+          ChunkCodec codec = ChunkCodec.valueOf(normalized);
+          codec.validateUserFacing(name);
+          return codec;
+        })
+        .collect(Collectors.toList());

Review Comment:
   Good catch — fixed. `fromNames()` now validates each element as non-null and 
non-blank with actionable error messages including the index position:
   ```java
   Preconditions.checkArgument(name != null,
       "Pipeline codec name at index %s must be non-null", i);
   Preconditions.checkArgument(!normalized.isEmpty(),
       "Pipeline codec name at index %s must be non-blank (value: '%s')", i, 
name);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to