xiangfu0 commented on code in PR #18092:
URL: https://github.com/apache/pinot/pull/18092#discussion_r3036490137
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -2174,22 +2175,45 @@ public static boolean isRelevantToTenant(TableConfig
tableConfig, String tenantN
}
private static void validateGorillaCompressionCodecIfPresent(FieldConfig
fieldConfig, FieldSpec fieldSpec) {
- if (fieldConfig.getCompressionCodec() == null) {
- return;
- }
- switch (fieldConfig.getCompressionCodec()) {
- case DELTA:
- case DELTADELTA:
+ // Validate legacy compressionCodec DELTA/DELTADELTA constraints
+ if (fieldConfig.getCompressionCodec() != null) {
+ switch (fieldConfig.getCompressionCodec()) {
+ case DELTA:
+ case DELTADELTA:
+ Preconditions.checkState(fieldSpec.isSingleValueField(),
+ "Compression codec %s can only be used on single-value columns,
found multi-value column: %s",
+ fieldConfig.getCompressionCodec(), fieldConfig.getName());
+ DataType storedType = fieldSpec.getDataType().getStoredType();
+ Preconditions.checkState(storedType == DataType.INT || storedType ==
DataType.LONG,
+ "Compression codec %s can only be used on INT/LONG data types,
found %s for column: %s",
+ fieldConfig.getCompressionCodec(), storedType,
fieldConfig.getName());
+ break;
+ default:
+ // no-op for other codecs
+ }
+ }
+
+ // Validate codecPipeline transform constraints (DELTA, DOUBLE_DELTA
require SV INT/LONG)
+ List<String> codecPipeline = fieldConfig.getCodecPipeline();
+ if (codecPipeline != null && !codecPipeline.isEmpty()) {
+ boolean hasTransform = codecPipeline.stream().anyMatch(name -> {
+ try {
+ ChunkCodec codec = ChunkCodec.valueOf(name.trim().toUpperCase());
+ return codec.isTransform();
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ });
Review Comment:
Good catch — fixed. The validation now constructs the full pipeline via
`ChunkCodecPipeline.fromNames(codecPipeline)` upfront, which validates all
stage names and rejects internal-only codecs. Invalid/misspelled names will now
fail with a clear `IllegalArgumentException` at table validation time.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -380,23 +381,37 @@ private boolean shouldChangeRawCompressionType(String
column, SegmentDirectory.R
// The compression type for an existing segment can only be determined by
reading the forward index header.
ColumnMetadata existingColMetadata =
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
ChunkCompressionType existingCompressionType;
+ ChunkCodecPipeline existingPipeline;
// Get the forward index reader factory and create a reader
IndexReaderFactory<ForwardIndexReader> readerFactory =
StandardIndexes.forward().getReaderFactory();
try (ForwardIndexReader<?> fwdIndexReader =
readerFactory.createIndexReader(segmentReader,
_fieldIndexConfigs.get(column), existingColMetadata)) {
existingCompressionType = fwdIndexReader.getCompressionType();
+ existingPipeline = fwdIndexReader.getCodecPipeline();
Preconditions.checkState(existingCompressionType != null,
"Existing compressionType cannot be null for raw forward index
column=" + column);
}
- // Get the new compression type.
- ChunkCompressionType newCompressionType =
-
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()).getChunkCompressionType();
+ ForwardIndexConfig newConfig =
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
+ ChunkCodecPipeline newPipeline = newConfig.getCodecPipeline();
+
+ // newPipeline is non-null for all non-CLP RAW codecs (auto-derived from
compressionCodec).
+ // Compare via pipeline when possible; fall back to compressionType for
CLP codecs.
+ if (newPipeline != null) {
+ // For legacy segments (pre-V7), derive pipeline from their
compressionType for comparison.
+ // e.g., existing DELTA segment → pipeline [DELTA_LZ4]; existing LZ4
segment → pipeline [LZ4].
+ if (existingPipeline == null) {
+ existingPipeline =
ChunkCodecPipeline.fromCompressionType(existingCompressionType);
+ }
+ return !existingPipeline.equals(newPipeline);
+ }
+ // Neither has a pipeline (CLP codecs) — compare compression types
directly.
// Note that default compression type (PASS_THROUGH for metric and LZ4 for
dimension) is not considered if the
// compressionType is not explicitly provided in tableConfig. This is to
avoid incorrectly rewriting all the
// forward indexes during segmentReload when the default compressionType
changes.
+ ChunkCompressionType newCompressionType =
newConfig.getChunkCompressionType();
return newCompressionType != null && existingCompressionType !=
newCompressionType;
Review Comment:
Acknowledged — the CLP variant detection limitation exists in the
pre-existing code (this PR doesn't change the CLP comparison logic, only adds
the pipeline comparison path above it). CLP codecs have their own dedicated
creator/reader paths and don't participate in the pipeline abstraction.
Improving CLP variant change detection is a separate concern that should be
addressed in a follow-up if needed.
##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/codec/ChunkCodecPipeline.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.compression;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+
+/**
+ * An immutable, ordered sequence of {@link ChunkCodec} stages that defines
how chunk data is
+ * encoded before being written to disk and decoded after being read back.
+ *
+ * <p>Invariants enforced at construction time:
+ * <ul>
+ * <li>The pipeline is non-empty.</li>
+ * <li>At most one {@link ChunkCodec.CodecKind#COMPRESSOR COMPRESSOR} stage
is present,
+ * and it must be the <b>last</b> stage.</li>
+ * <li>All preceding stages (if any) must be {@link
ChunkCodec.CodecKind#TRANSFORM TRANSFORM}.</li>
+ * <li>Pipeline length is at most {@value #MAX_PIPELINE_LENGTH}.</li>
+ * </ul>
+ *
+ * <p>On write the stages are applied left-to-right (transforms first, then
compression).
+ * On read the stages are applied right-to-left (decompress first, then
reverse transforms).</p>
+ */
+public final class ChunkCodecPipeline {
+
+ /** Maximum number of codec stages in a single pipeline. */
+ public static final int MAX_PIPELINE_LENGTH = 8;
+
+ /** Default pipeline: a single LZ4 compressor, matching Pinot's historic
default. */
+ public static final ChunkCodecPipeline DEFAULT = new ChunkCodecPipeline(
+ Collections.singletonList(ChunkCodec.LZ4));
+
+ private final List<ChunkCodec> _stages;
+
+ /**
+ * Creates a pipeline from an ordered list of codec stages.
+ *
+ * @throws IllegalArgumentException if invariants are violated
+ */
+ public ChunkCodecPipeline(List<ChunkCodec> stages) {
+ Preconditions.checkArgument(stages != null && !stages.isEmpty(), "Pipeline
must have at least one stage");
+ Preconditions.checkArgument(stages.size() <= MAX_PIPELINE_LENGTH,
+ "Pipeline length %s exceeds maximum of %s", stages.size(),
MAX_PIPELINE_LENGTH);
+
+ // Validate: all transforms first, at most one compressor at the end
+ boolean seenCompressor = false;
+ for (int i = 0; i < stages.size(); i++) {
+ ChunkCodec codec = stages.get(i);
+ Preconditions.checkArgument(codec != null, "Pipeline stage at index %s
is null", i);
+ if (codec.isCompressor()) {
+ Preconditions.checkArgument(!seenCompressor, "Pipeline contains more
than one compressor: %s", stages);
+ Preconditions.checkArgument(i == stages.size() - 1,
+ "Compressor %s must be the last stage in the pipeline, but found
at index %s of %s",
+ codec, i, stages.size());
+ seenCompressor = true;
+ }
+ }
+
+ _stages = Collections.unmodifiableList(new ArrayList<>(stages));
+ }
+
+ /** Returns the ordered, immutable list of codec stages. */
+ public List<ChunkCodec> getStages() {
+ return _stages;
+ }
+
+ /** Returns the number of stages in the pipeline. */
+ public int size() {
+ return _stages.size();
+ }
+
+ /** Returns the codec at the given index. */
+ public ChunkCodec get(int index) {
+ return _stages.get(index);
+ }
+
+ /**
+ * Returns the terminal compressor, or {@link ChunkCodec#PASS_THROUGH} if
the pipeline
+ * contains only transforms.
+ */
+ public ChunkCodec getCompressor() {
+ ChunkCodec last = _stages.get(_stages.size() - 1);
+ return last.isCompressor() ? last : ChunkCodec.PASS_THROUGH;
+ }
+
+ /** Returns the transform stages (all stages except the terminal
compressor). */
+ public List<ChunkCodec> getTransforms() {
+ ChunkCodec last = _stages.get(_stages.size() - 1);
+ if (last.isCompressor()) {
+ return _stages.subList(0, _stages.size() - 1);
+ }
+ return _stages;
+ }
+
+ /** Returns {@code true} if the pipeline contains any transform stages. */
+ public boolean hasTransforms() {
+ return !getTransforms().isEmpty();
+ }
+
+ /**
+ * Maps the terminal compressor back to a {@link ChunkCompressionType} for
backward
+ * compatibility with readers/writers that still use the legacy enum.
+ */
+ public ChunkCompressionType getChunkCompressionType() {
+ ChunkCodec compressor = getCompressor();
+ switch (compressor) {
+ case PASS_THROUGH:
+ return ChunkCompressionType.PASS_THROUGH;
+ case SNAPPY:
+ return ChunkCompressionType.SNAPPY;
+ case ZSTANDARD:
+ return ChunkCompressionType.ZSTANDARD;
+ case LZ4:
+ return ChunkCompressionType.LZ4;
+ case GZIP:
+ return ChunkCompressionType.GZIP;
+ case DELTA_LZ4:
+ return ChunkCompressionType.DELTA;
+ case DOUBLE_DELTA_LZ4:
+ return ChunkCompressionType.DELTADELTA;
+ default:
+ throw new IllegalStateException("No ChunkCompressionType mapping for:
" + compressor);
+ }
+ }
+
+ /**
+ * Creates a pipeline from codec names (e.g., {@code ["DELTA",
"ZSTANDARD"]}).
+ * Internal-only codecs (see {@link ChunkCodec#isInternalOnly()}) are
rejected.
+ *
+ * @throws IllegalArgumentException if names contain internal-only codecs
+ */
+ public static ChunkCodecPipeline fromNames(List<String> names) {
+ Preconditions.checkArgument(names != null && !names.isEmpty(), "Pipeline
names must be non-empty");
+ List<ChunkCodec> stages = names.stream()
+ .map(name -> {
+ String normalized = name.trim().toUpperCase();
+ ChunkCodec codec = ChunkCodec.valueOf(normalized);
+ codec.validateUserFacing(name);
+ return codec;
+ })
+ .collect(Collectors.toList());
Review Comment:
Good catch — fixed. `fromNames()` now validates each element as non-null and
non-blank with actionable error messages including the index position:
```java
Preconditions.checkArgument(name != null,
"Pipeline codec name at index %s must be non-null", i);
Preconditions.checkArgument(!normalized.isEmpty(),
"Pipeline codec name at index %s must be non-blank (value: '%s')", i,
name);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]