xiangfu0 commented on code in PR #18480: URL: https://github.com/apache/pinot/pull/18480#discussion_r3231102097
########## pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingSegmentTierOverrideTest.java: ########## @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils; + +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.tier.TierFactory; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.index.FieldIndexConfigs; +import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.TierConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + + +/// Tests for the synthetic `consuming` tier override used while constructing mutable realtime consuming segments. Review Comment: Intentional for this PR. The repo instructions for this work require Java 23 Markdown documentation comments using `///`; scoped checkstyle passed cleanly. ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingSegmentTierOverrideRealtimeTest.java: ########## @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.custom; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.helix.model.IdealState; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.server.starter.helix.BaseServerStarter; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/// End-to-end coverage for the synthetic `consuming` tier override on a realtime table. +/// +/// The table persists `profiledString` as RAW with no inverted index. `tierOverwrites.consuming` upgrades that column +/// to DICTIONARY + INVERTED only while the segment is mutable and consuming. After force-commit, the immutable segment +/// must retain the persisted RAW/no-inverted shape on disk. Review Comment: Intentional for this PR. The repo instructions for this work require Java 23 Markdown documentation comments using `///`; scoped checkstyle passed cleanly. ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java: ########## @@ -305,6 +306,11 @@ protected BaseControllerStarter getSharedControllerStarter() { return _sharedClusterTestSuite._controllerStarter; } + /// Returns server starters from the shared suite instance. Review Comment: Intentional for this PR. The repo instructions for this work require Java 23 Markdown documentation comments using `///`; scoped checkstyle passed cleanly. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -2126,6 +2140,194 @@ private static void overwriteConfig(JsonNode oldCfg, JsonNode newCfg) { } } + public static boolean isConsumingSegmentTier(@Nullable String tier) { + return CONSUMING_SEGMENT_TIER.equals(tier); + } + + public static boolean isSyntheticConsumingSegmentTier(@Nullable TableConfig tableConfig, @Nullable String tier) { Review Comment: Keeping this public intentionally. `IndexLoadingConfig` is in a different package and calls this helper to preserve the existing storage-tier overwrite path, while `RealtimeSegmentDataManager` uses the companion consuming-builder helper. Centralizing this in `TableConfigUtils` avoids duplicating tier-overwrite semantics across modules. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -2126,6 +2140,194 @@ private static void overwriteConfig(JsonNode oldCfg, JsonNode newCfg) { } } + public static boolean isConsumingSegmentTier(@Nullable String tier) { + return CONSUMING_SEGMENT_TIER.equals(tier); + } + + public static boolean isSyntheticConsumingSegmentTier(@Nullable TableConfig tableConfig, @Nullable String tier) { + return tableConfig != null && tableConfig.getTableType() == TableType.REALTIME && isConsumingSegmentTier(tier) + && !hasTierConfig(tableConfig, tier); + } + + private static void validateConsumingTierOverwrites(TableConfig tableConfig, Schema schema) { + if (tableConfig.getTableType() != TableType.REALTIME + || !hasTierOverwritesForTier(tableConfig, CONSUMING_SEGMENT_TIER) + || hasTierConfig(tableConfig, CONSUMING_SEGMENT_TIER)) { + return; + } + validateConsumingTierOverwriteScope(tableConfig); + TableConfig consumingTableConfig = overwriteTableConfigForTier(tableConfig, CONSUMING_SEGMENT_TIER); + Preconditions.checkState(consumingTableConfig != tableConfig, + "Failed to apply tierOverwrites.%s for table: %s", CONSUMING_SEGMENT_TIER, tableConfig.getTableName()); + try { + validateIndexingConfigAndFieldConfigList(consumingTableConfig, schema); + } catch (RuntimeException e) { + throw new IllegalStateException( + "tierOverwrites." + CONSUMING_SEGMENT_TIER + " produces an invalid mutable consuming segment config: " + + e.getMessage(), e); + } + } + + private static void validateConsumingTierOverwriteScope(TableConfig tableConfig) { + IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); + if (indexingConfig != null) { + JsonNode consumingIndexingOverwrite = + getTierOverwrite(indexingConfig.getTierOverwrites(), CONSUMING_SEGMENT_TIER); + if (consumingIndexingOverwrite != null) { + Preconditions.checkState(consumingIndexingOverwrite.isObject(), + "tableIndexConfig.tierOverwrites.%s must be a JSON object", CONSUMING_SEGMENT_TIER); + Iterator<String> keys = consumingIndexingOverwrite.fieldNames(); + while (keys.hasNext()) { + String key = keys.next(); + Preconditions.checkState(CONSUMING_SEGMENT_TIER_INDEXING_CONFIG_KEYS.contains(key), + "Unsupported tableIndexConfig.tierOverwrites.%s key: %s; supported keys: %s", CONSUMING_SEGMENT_TIER, + key, CONSUMING_SEGMENT_TIER_INDEXING_CONFIG_KEYS); + } + } + } + List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList(); + if (fieldConfigList == null) { + return; + } + for (FieldConfig fieldConfig : fieldConfigList) { + JsonNode consumingFieldOverwrite = getTierOverwrite(fieldConfig.getTierOverwrites(), CONSUMING_SEGMENT_TIER); + if (consumingFieldOverwrite == null) { + continue; + } + String columnName = fieldConfig.getName(); + Preconditions.checkState(consumingFieldOverwrite.isObject(), + "fieldConfigList[%s].tierOverwrites.%s must be a JSON object", columnName, CONSUMING_SEGMENT_TIER); + Iterator<String> keys = consumingFieldOverwrite.fieldNames(); + while (keys.hasNext()) { + String key = keys.next(); + Preconditions.checkState(CONSUMING_SEGMENT_TIER_FIELD_CONFIG_KEYS.contains(key), + "Unsupported fieldConfigList[%s].tierOverwrites.%s key: %s; supported keys: %s", columnName, + CONSUMING_SEGMENT_TIER, key, CONSUMING_SEGMENT_TIER_FIELD_CONFIG_KEYS); + JsonNode value = consumingFieldOverwrite.get(key); + if ("indexType".equals(key)) { + validateConsumingFieldIndexType(columnName, value); + } else if ("indexTypes".equals(key)) { + validateConsumingFieldIndexTypes(columnName, value); + } else if ("indexes".equals(key)) { + validateConsumingFieldIndexes(columnName, value); + } + } + } + } + + private static void validateConsumingFieldIndexType(String columnName, JsonNode indexTypeNode) { + if (indexTypeNode == null || indexTypeNode.isNull()) { + return; + } + Preconditions.checkState(indexTypeNode.isTextual(), + "fieldConfigList[%s].tierOverwrites.%s.indexType must be a string", columnName, CONSUMING_SEGMENT_TIER); + FieldConfig.IndexType indexType; + try { + indexType = FieldConfig.IndexType.valueOf(indexTypeNode.asText()); + } catch (IllegalArgumentException e) { + throw new IllegalStateException(String.format( + "Unsupported fieldConfigList[%s].tierOverwrites.%s indexType: %s; supported indexTypes: %s", columnName, + CONSUMING_SEGMENT_TIER, indexTypeNode.asText(), CONSUMING_SEGMENT_TIER_FIELD_INDEX_TYPES), e); + } + Preconditions.checkState(CONSUMING_SEGMENT_TIER_FIELD_INDEX_TYPES.contains(indexType), + "Unsupported fieldConfigList[%s].tierOverwrites.%s indexType: %s; supported indexTypes: %s", columnName, + CONSUMING_SEGMENT_TIER, indexType, CONSUMING_SEGMENT_TIER_FIELD_INDEX_TYPES); + } + + private static void validateConsumingFieldIndexTypes(String columnName, JsonNode indexTypesNode) { + if (indexTypesNode == null || indexTypesNode.isNull()) { + return; + } + Preconditions.checkState(indexTypesNode.isArray(), + "fieldConfigList[%s].tierOverwrites.%s.indexTypes must be an array", columnName, CONSUMING_SEGMENT_TIER); + for (JsonNode indexTypeNode : indexTypesNode) { + validateConsumingFieldIndexType(columnName, indexTypeNode); + } + } + + private static void validateConsumingFieldIndexes(String columnName, JsonNode indexesNode) { + if (indexesNode == null || indexesNode.isNull()) { + return; + } + Preconditions.checkState(indexesNode.isObject(), + "fieldConfigList[%s].tierOverwrites.%s.indexes must be a JSON object", columnName, CONSUMING_SEGMENT_TIER); + Iterator<String> indexNames = indexesNode.fieldNames(); + while (indexNames.hasNext()) { + String indexName = indexNames.next(); + Preconditions.checkState(CONSUMING_SEGMENT_TIER_FIELD_INDEX_NAMES.contains(indexName), + "Unsupported fieldConfigList[%s].tierOverwrites.%s index: %s; supported indexes: %s", columnName, + CONSUMING_SEGMENT_TIER, indexName, CONSUMING_SEGMENT_TIER_FIELD_INDEX_NAMES); + } + } + + /// Builds the [RealtimeSegmentConfig.Builder] for a mutable consuming segment. If the table config contains + /// `tierOverwrites.consuming` under `tableIndexConfig` or a `fieldConfigList` entry, the builder is created from the + /// existing tier-overwrite view for that synthetic tier. Only index-loading settings are supported for + /// `tableIndexConfig.tierOverwrites.consuming`; settings that control row shape or ingestion behavior must stay on + /// the persisted table config. If `consuming` is already configured as a real storage tier, storage-tier semantics + /// take precedence for backward compatibility. The original [IndexLoadingConfig] is left untouched so the commit path + /// and immutable segment load path continue to use the persisted table config and real segment tier. Review Comment: Intentional for this PR. The repo instructions for this work require Java 23 Markdown documentation comments using contiguous `///` and square-bracket element references. The scoped `checkstyle:check` run passed cleanly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
