xiangfu0 commented on code in PR #18480:
URL: https://github.com/apache/pinot/pull/18480#discussion_r3231102097


##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingSegmentTierOverrideTest.java:
##########
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.tier.TierFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TierConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+
+/// Tests for the synthetic `consuming` tier override used while constructing 
mutable realtime consuming segments.

Review Comment:
   Intentional for this PR. The repo instructions for this work require Java 23 
Markdown documentation comments using `///`; scoped checkstyle passed cleanly.



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingSegmentTierOverrideRealtimeTest.java:
##########
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/// End-to-end coverage for the synthetic `consuming` tier override on a 
realtime table.
+///
+/// The table persists `profiledString` as RAW with no inverted index. 
`tierOverwrites.consuming` upgrades that column
+/// to DICTIONARY + INVERTED only while the segment is mutable and consuming. 
After force-commit, the immutable segment
+/// must retain the persisted RAW/no-inverted shape on disk.

Review Comment:
   Intentional for this PR. The repo instructions for this work require Java 23 
Markdown documentation comments using `///`; scoped checkstyle passed cleanly.



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java:
##########
@@ -305,6 +306,11 @@ protected BaseControllerStarter 
getSharedControllerStarter() {
     return _sharedClusterTestSuite._controllerStarter;
   }
 
+  /// Returns server starters from the shared suite instance.

Review Comment:
   Intentional for this PR. The repo instructions for this work require Java 23 
Markdown documentation comments using `///`; scoped checkstyle passed cleanly.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -2126,6 +2140,194 @@ private static void overwriteConfig(JsonNode oldCfg, 
JsonNode newCfg) {
     }
   }
 
+  public static boolean isConsumingSegmentTier(@Nullable String tier) {
+    return CONSUMING_SEGMENT_TIER.equals(tier);
+  }
+
+  public static boolean isSyntheticConsumingSegmentTier(@Nullable TableConfig 
tableConfig, @Nullable String tier) {

Review Comment:
   Keeping this public intentionally. `IndexLoadingConfig` is in a different 
package and calls this helper to preserve the existing storage-tier overwrite 
path, while `RealtimeSegmentDataManager` uses the companion consuming-builder 
helper. Centralizing this in `TableConfigUtils` avoids duplicating 
tier-overwrite semantics across modules.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -2126,6 +2140,194 @@ private static void overwriteConfig(JsonNode oldCfg, 
JsonNode newCfg) {
     }
   }
 
+  public static boolean isConsumingSegmentTier(@Nullable String tier) {
+    return CONSUMING_SEGMENT_TIER.equals(tier);
+  }
+
+  public static boolean isSyntheticConsumingSegmentTier(@Nullable TableConfig 
tableConfig, @Nullable String tier) {
+    return tableConfig != null && tableConfig.getTableType() == 
TableType.REALTIME && isConsumingSegmentTier(tier)
+        && !hasTierConfig(tableConfig, tier);
+  }
+
+  private static void validateConsumingTierOverwrites(TableConfig tableConfig, 
Schema schema) {
+    if (tableConfig.getTableType() != TableType.REALTIME
+        || !hasTierOverwritesForTier(tableConfig, CONSUMING_SEGMENT_TIER)
+        || hasTierConfig(tableConfig, CONSUMING_SEGMENT_TIER)) {
+      return;
+    }
+    validateConsumingTierOverwriteScope(tableConfig);
+    TableConfig consumingTableConfig = 
overwriteTableConfigForTier(tableConfig, CONSUMING_SEGMENT_TIER);
+    Preconditions.checkState(consumingTableConfig != tableConfig,
+        "Failed to apply tierOverwrites.%s for table: %s", 
CONSUMING_SEGMENT_TIER, tableConfig.getTableName());
+    try {
+      validateIndexingConfigAndFieldConfigList(consumingTableConfig, schema);
+    } catch (RuntimeException e) {
+      throw new IllegalStateException(
+          "tierOverwrites." + CONSUMING_SEGMENT_TIER + " produces an invalid 
mutable consuming segment config: "
+              + e.getMessage(), e);
+    }
+  }
+
+  private static void validateConsumingTierOverwriteScope(TableConfig 
tableConfig) {
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    if (indexingConfig != null) {
+      JsonNode consumingIndexingOverwrite =
+          getTierOverwrite(indexingConfig.getTierOverwrites(), 
CONSUMING_SEGMENT_TIER);
+      if (consumingIndexingOverwrite != null) {
+        Preconditions.checkState(consumingIndexingOverwrite.isObject(),
+            "tableIndexConfig.tierOverwrites.%s must be a JSON object", 
CONSUMING_SEGMENT_TIER);
+        Iterator<String> keys = consumingIndexingOverwrite.fieldNames();
+        while (keys.hasNext()) {
+          String key = keys.next();
+          
Preconditions.checkState(CONSUMING_SEGMENT_TIER_INDEXING_CONFIG_KEYS.contains(key),
+              "Unsupported tableIndexConfig.tierOverwrites.%s key: %s; 
supported keys: %s", CONSUMING_SEGMENT_TIER,
+              key, CONSUMING_SEGMENT_TIER_INDEXING_CONFIG_KEYS);
+        }
+      }
+    }
+    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+    if (fieldConfigList == null) {
+      return;
+    }
+    for (FieldConfig fieldConfig : fieldConfigList) {
+      JsonNode consumingFieldOverwrite = 
getTierOverwrite(fieldConfig.getTierOverwrites(), CONSUMING_SEGMENT_TIER);
+      if (consumingFieldOverwrite == null) {
+        continue;
+      }
+      String columnName = fieldConfig.getName();
+      Preconditions.checkState(consumingFieldOverwrite.isObject(),
+          "fieldConfigList[%s].tierOverwrites.%s must be a JSON object", 
columnName, CONSUMING_SEGMENT_TIER);
+      Iterator<String> keys = consumingFieldOverwrite.fieldNames();
+      while (keys.hasNext()) {
+        String key = keys.next();
+        
Preconditions.checkState(CONSUMING_SEGMENT_TIER_FIELD_CONFIG_KEYS.contains(key),
+            "Unsupported fieldConfigList[%s].tierOverwrites.%s key: %s; 
supported keys: %s", columnName,
+            CONSUMING_SEGMENT_TIER, key, 
CONSUMING_SEGMENT_TIER_FIELD_CONFIG_KEYS);
+        JsonNode value = consumingFieldOverwrite.get(key);
+        if ("indexType".equals(key)) {
+          validateConsumingFieldIndexType(columnName, value);
+        } else if ("indexTypes".equals(key)) {
+          validateConsumingFieldIndexTypes(columnName, value);
+        } else if ("indexes".equals(key)) {
+          validateConsumingFieldIndexes(columnName, value);
+        }
+      }
+    }
+  }
+
+  private static void validateConsumingFieldIndexType(String columnName, 
JsonNode indexTypeNode) {
+    if (indexTypeNode == null || indexTypeNode.isNull()) {
+      return;
+    }
+    Preconditions.checkState(indexTypeNode.isTextual(),
+        "fieldConfigList[%s].tierOverwrites.%s.indexType must be a string", 
columnName, CONSUMING_SEGMENT_TIER);
+    FieldConfig.IndexType indexType;
+    try {
+      indexType = FieldConfig.IndexType.valueOf(indexTypeNode.asText());
+    } catch (IllegalArgumentException e) {
+      throw new IllegalStateException(String.format(
+          "Unsupported fieldConfigList[%s].tierOverwrites.%s indexType: %s; 
supported indexTypes: %s", columnName,
+          CONSUMING_SEGMENT_TIER, indexTypeNode.asText(), 
CONSUMING_SEGMENT_TIER_FIELD_INDEX_TYPES), e);
+    }
+    
Preconditions.checkState(CONSUMING_SEGMENT_TIER_FIELD_INDEX_TYPES.contains(indexType),
+        "Unsupported fieldConfigList[%s].tierOverwrites.%s indexType: %s; 
supported indexTypes: %s", columnName,
+        CONSUMING_SEGMENT_TIER, indexType, 
CONSUMING_SEGMENT_TIER_FIELD_INDEX_TYPES);
+  }
+
+  private static void validateConsumingFieldIndexTypes(String columnName, 
JsonNode indexTypesNode) {
+    if (indexTypesNode == null || indexTypesNode.isNull()) {
+      return;
+    }
+    Preconditions.checkState(indexTypesNode.isArray(),
+        "fieldConfigList[%s].tierOverwrites.%s.indexTypes must be an array", 
columnName, CONSUMING_SEGMENT_TIER);
+    for (JsonNode indexTypeNode : indexTypesNode) {
+      validateConsumingFieldIndexType(columnName, indexTypeNode);
+    }
+  }
+
+  private static void validateConsumingFieldIndexes(String columnName, 
JsonNode indexesNode) {
+    if (indexesNode == null || indexesNode.isNull()) {
+      return;
+    }
+    Preconditions.checkState(indexesNode.isObject(),
+        "fieldConfigList[%s].tierOverwrites.%s.indexes must be a JSON object", 
columnName, CONSUMING_SEGMENT_TIER);
+    Iterator<String> indexNames = indexesNode.fieldNames();
+    while (indexNames.hasNext()) {
+      String indexName = indexNames.next();
+      
Preconditions.checkState(CONSUMING_SEGMENT_TIER_FIELD_INDEX_NAMES.contains(indexName),
+          "Unsupported fieldConfigList[%s].tierOverwrites.%s index: %s; 
supported indexes: %s", columnName,
+          CONSUMING_SEGMENT_TIER, indexName, 
CONSUMING_SEGMENT_TIER_FIELD_INDEX_NAMES);
+    }
+  }
+
+  /// Builds the [RealtimeSegmentConfig.Builder] for a mutable consuming 
segment. If the table config contains
+  /// `tierOverwrites.consuming` under `tableIndexConfig` or a 
`fieldConfigList` entry, the builder is created from the
+  /// existing tier-overwrite view for that synthetic tier. Only index-loading 
settings are supported for
+  /// `tableIndexConfig.tierOverwrites.consuming`; settings that control row 
shape or ingestion behavior must stay on
+  /// the persisted table config. If `consuming` is already configured as a 
real storage tier, storage-tier semantics
+  /// take precedence for backward compatibility. The original 
[IndexLoadingConfig] is left untouched so the commit path
+  /// and immutable segment load path continue to use the persisted table 
config and real segment tier.

Review Comment:
   Intentional for this PR. The repo instructions for this work require Java 23 
Markdown documentation comments using contiguous `///` and square-bracket 
element references. The scoped `checkstyle:check` run passed cleanly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to