This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9e2335de3c3 Support consuming tier overrides for realtime segments
(#18480)
9e2335de3c3 is described below
commit 9e2335de3c3f252eef1147b7af7f5e1d16fde403
Author: Xiang Fu <[email protected]>
AuthorDate: Tue Jun 2 20:07:51 2026 -0700
Support consuming tier overrides for realtime segments (#18480)
Reuse the existing tierOverwrites mechanism only for mutable realtime
consuming segments. A realtime table opts in by defining
tierOverwrites.consuming, so no separate tableIndexConfig feature flag is
needed.
Keep generic storage-tier override behavior tied to actual segment tier
metadata. If tierConfigs declares a real storage tier named consuming, that
name keeps normal storage-tier semantics for backward compatibility.
Apply the consuming override first, then validate the effective consuming
table config through the normal validation path. Persisted segment commit and
immutable segment loading continue to use the base table config unless normal
storage-tier metadata applies during segment loading.
Add consuming-specific guards for row-shape settings, partitioning, null
handling, aggregation dictionary layout, and mutable text-index reuse because
those settings must stay aligned with the base config used for persisted
segment commit.
Add focused unit coverage and an end-to-end realtime integration test
covering consuming dictionary and inverted index overrides with RAW and no
inverted index on committed segments.
---
.../realtime/RealtimeSegmentDataManager.java | 12 +-
.../ConsumingSegmentTierOverrideRealtimeTest.java | 272 +++++++++++++++++++++
.../CustomDataQueryClusterIntegrationTest.java | 6 +
.../segment/local/utils/TableConfigUtils.java | 91 +++++++
...ableConfigConsumingSegmentTierOverrideTest.java | 181 ++++++++++++++
.../segment/local/utils/TableConfigUtilsTest.java | 11 +
6 files changed, 570 insertions(+), 3 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index fa423c7d081..3b0d9d72202 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -69,6 +69,7 @@ import
org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.UpsertContext;
import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
@@ -1821,9 +1822,14 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_isOffHeap = indexLoadingConfig.isRealtimeOffHeapAllocation();
_defaultNullHandlingEnabled = indexingConfig.isNullHandlingEnabled();
- // Start new realtime segment
+ // Start new realtime segment. Use a mutable-only tier-overwritten index
loading view when consuming tier overrides
+ // are configured; committed segments continue to use the base table
config.
String consumerDir = realtimeTableDataManager.getConsumerDir();
- RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new
RealtimeSegmentConfig.Builder(indexLoadingConfig)
+ IndexLoadingConfig consumingIndexLoadingConfig =
+ TableConfigUtils.buildConsumingSegmentIndexLoadingConfig(_tableConfig,
_schema, indexLoadingConfig);
+ RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
+ new RealtimeSegmentConfig.Builder(consumingIndexLoadingConfig);
+ realtimeSegmentConfigBuilder
.setTableNameWithType(_tableNameWithType)
.setSegmentName(_segmentNameStr)
.setStreamName(streamTopic)
@@ -1841,7 +1847,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setConsumerDir(consumerDir)
-
.setTextIndexConfig(_tableConfig.getIndexingConfig().getMultiColumnTextIndexConfig())
+
.setTextIndexConfig(consumingIndexLoadingConfig.getMultiColTextIndexConfig())
.setDropRecordOnPartitionMismatch(ingestionConfig != null
&& ingestionConfig.getStreamIngestionConfig() != null
&&
ingestionConfig.getStreamIngestionConfig().isDropRecordOnPartitionMismatch());
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingSegmentTierOverrideRealtimeTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingSegmentTierOverrideRealtimeTest.java
new file mode 100644
index 00000000000..5770b58abcf
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingSegmentTierOverrideRealtimeTest.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/// End-to-end coverage for the synthetic `consuming` tier override on a
realtime table.
+///
+/// The table persists `profiledString` as RAW with no inverted index.
`tierOverwrites.consuming` upgrades that column
+/// to DICTIONARY + INVERTED only while the segment is mutable and consuming.
After force-commit, the immutable segment
+/// must retain the persisted RAW/no-inverted shape on disk.
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class ConsumingSegmentTierOverrideRealtimeTest extends
CustomDataQueryClusterIntegrationTest {
+ private static final String TABLE_NAME =
"ConsumingSegmentTierOverrideRealtimeTest";
+ private static final int NUM_DOCS = 200;
+ private static final String PROFILED_STRING_COLUMN = "profiledString";
+ private static final String TIME_COLUMN = "tsMillis";
+
+ @Override
+ public String getTableName() {
+ return TABLE_NAME;
+ }
+
+ @Override
+ public boolean isRealtimeTable() {
+ return true;
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ return NUM_DOCS;
+ }
+
+ @Override
+ protected int getRealtimeSegmentFlushSize() {
+ return NUM_DOCS * 1000;
+ }
+
+ @Override
+ protected Map<String, String> getStreamConfigMap() {
+ Map<String, String> streamConfigMap = new
HashMap<>(super.getStreamConfigMap());
+ streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME,
"365d");
+ return streamConfigMap;
+ }
+
+ @Override
+ public Schema createSchema() {
+ return new Schema.SchemaBuilder().setSchemaName(getTableName())
+ .addSingleValueDimension(PROFILED_STRING_COLUMN,
FieldSpec.DataType.STRING)
+ .addDateTimeField(TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .build();
+ }
+
+ @Override
+ public String getTimeColumnName() {
+ return TIME_COLUMN;
+ }
+
+ @Override
+ protected String getSortedColumn() {
+ return null;
+ }
+
+ @Override
+ protected List<String> getNoDictionaryColumns() {
+ return List.of(PROFILED_STRING_COLUMN);
+ }
+
+ @Override
+ protected List<FieldConfig> getFieldConfigs() {
+ return List.of(new FieldConfig.Builder(PROFILED_STRING_COLUMN)
+ .withEncodingType(FieldConfig.EncodingType.RAW)
+
.withTierOverwrites(jsonNode("{\"consuming\":{\"encodingType\":\"DICTIONARY\","
+ + "\"indexes\":{\"inverted\":{\"disabled\":false}}}}"))
+ .build());
+ }
+
+ @Override
+ protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
+ AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
+ return getTableConfigBuilder(TableType.REALTIME)
+
.setTierOverwrites(jsonNode("{\"consuming\":{\"noDictionaryColumns\":[]}}"))
+ .build();
+ }
+
+ @Override
+ public List<File> createAvroFiles()
+ throws Exception {
+ org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+ org.apache.avro.Schema stringSchema =
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING);
+ org.apache.avro.Schema longSchema =
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG);
+ avroSchema.setFields(List.of(
+ new org.apache.avro.Schema.Field(PROFILED_STRING_COLUMN, stringSchema,
null, null),
+ new org.apache.avro.Schema.Field(TIME_COLUMN, longSchema, null,
null)));
+
+ long now = System.currentTimeMillis();
+ try (AvroFilesAndWriters avroFilesAndWriters =
createAvroFilesAndWriters(avroSchema)) {
+ List<DataFileWriter<GenericData.Record>> writers =
avroFilesAndWriters.getWriters();
+ for (int i = 0; i < NUM_DOCS; i++) {
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ record.put(PROFILED_STRING_COLUMN, "value-" + i % 4);
+ record.put(TIME_COLUMN, now + i);
+ writers.get(i % getNumAvroFiles()).append(record);
+ }
+ return avroFilesAndWriters.getAvroFiles();
+ }
+ }
+
+ @Test
+ public void testConsumingSegmentTierOverrideEndToEnd()
+ throws Exception {
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+
+ int consumingSegmentsInspected =
inspectConsumingSegments(realtimeTableName);
+ assertTrue(consumingSegmentsInspected > 0,
+ "Expected at least one consuming segment before forceCommit, got " +
consumingSegmentsInspected);
+
+ forceCommitAndWait(realtimeTableName);
+ }
+
+ private int inspectConsumingSegments(String realtimeTableName)
+ throws Exception {
+ return inspectSegments(realtimeTableName, MutableSegmentImpl.class,
segment -> {
+ DataSource profiled = segment.getDataSource(PROFILED_STRING_COLUMN);
+ assertNotNull(profiled.getDictionary(),
+ PROFILED_STRING_COLUMN + " must have a consuming-segment
dictionary");
+ assertNotNull(profiled.getInvertedIndex(),
+ PROFILED_STRING_COLUMN + " must have a consuming-segment inverted
index");
+ });
+ }
+
+ private int inspectImmutableSegments(String realtimeTableName)
+ throws Exception {
+ return inspectSegments(realtimeTableName, ImmutableSegmentImpl.class,
segment -> {
+ SegmentMetadata metadata = segment.getSegmentMetadata();
+ ColumnMetadata profiledMetadata =
metadata.getColumnMetadataFor(PROFILED_STRING_COLUMN);
+ assertNotNull(profiledMetadata, "Missing metadata for " +
PROFILED_STRING_COLUMN);
+ assertFalse(profiledMetadata.hasDictionary(),
+ PROFILED_STRING_COLUMN + " must remain RAW/no-dictionary after
commit");
+ assertEquals(profiledMetadata.getForwardIndexEncoding(),
FieldConfig.EncodingType.RAW,
+ PROFILED_STRING_COLUMN + " must persist with RAW forward-index
encoding after commit");
+
+ try (SegmentDirectory directory = new
SegmentLocalFSDirectory(metadata.getIndexDir(), ReadMode.mmap);
+ SegmentDirectory.Reader reader = directory.createReader()) {
+ assertTrue(reader.hasIndexFor(PROFILED_STRING_COLUMN,
StandardIndexes.forward()),
+ PROFILED_STRING_COLUMN + " RAW forward index must be persisted");
+ assertFalse(reader.hasIndexFor(PROFILED_STRING_COLUMN,
StandardIndexes.dictionary()),
+ PROFILED_STRING_COLUMN + " dictionary must not be persisted");
+ assertFalse(reader.hasIndexFor(PROFILED_STRING_COLUMN,
StandardIndexes.inverted()),
+ PROFILED_STRING_COLUMN + " inverted index must not be persisted");
+ }
+ });
+ }
+
+ private int inspectSegments(String realtimeTableName, Class<?> segmentType,
SegmentAssertion assertion)
+ throws Exception {
+ int inspected = 0;
+ for (BaseServerStarter serverStarter : getSharedServerStarters()) {
+ TableDataManager tableDataManager =
serverStarter.getServerInstance().getInstanceDataManager()
+ .getTableDataManager(realtimeTableName);
+ assertNotNull(tableDataManager, "Missing table data manager on server");
+ List<SegmentDataManager> segmentDataManagers =
tableDataManager.acquireAllSegments();
+ try {
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ IndexSegment segment = segmentDataManager.getSegment();
+ if (!segmentType.isInstance(segment)) {
+ continue;
+ }
+ assertion.accept(segment);
+ inspected++;
+ }
+ } finally {
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ tableDataManager.releaseSegment(segmentDataManager);
+ }
+ }
+ }
+ return inspected;
+ }
+
+ private void forceCommitAndWait(String realtimeTableName)
+ throws Exception {
+ String response =
getOrCreateAdminClient().getTableClient().forceCommit(realtimeTableName);
+ String jobId =
JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();
+
+ TestUtils.waitForCondition(aVoid -> isForceCommitComplete(jobId), 120_000L,
+ "Timed out waiting for forceCommit job: " + jobId);
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return inspectImmutableSegments(realtimeTableName) > 0;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, 120_000L, "Timed out waiting for force-committed segments to load as
immutable");
+ }
+
+ private static JsonNode jsonNode(String json) {
+ try {
+ return JsonUtils.stringToJsonNode(json);
+ } catch (IOException e) {
+ throw new IllegalStateException("Invalid test JSON: " + json, e);
+ }
+ }
+
+ private boolean isForceCommitComplete(String jobId) {
+ try {
+ String jobStatusResponse =
getOrCreateAdminClient().getTableClient().getForceCommitJobStatus(jobId);
+ JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse);
+ return
jobStatus.get(CommonConstants.ControllerJob.NUM_CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED).asInt(-1)
== 0;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private interface SegmentAssertion {
+ void accept(IndexSegment segment)
+ throws Exception;
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
index 865e87b3bb9..5a09cb3f9ff 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
@@ -46,6 +46,7 @@ import
org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
@@ -305,6 +306,11 @@ public abstract class
CustomDataQueryClusterIntegrationTest extends BaseClusterI
return _sharedClusterTestSuite._controllerStarter;
}
+ /// Returns server starters from the shared suite instance.
+ protected List<BaseServerStarter> getSharedServerStarters() {
+ return _sharedClusterTestSuite._serverStarters;
+ }
+
/**
* Returns the property store from the shared suite instance.
*/
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 5f87017a084..5d8c1919e6a 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -47,6 +47,7 @@ import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.segment.local.aggregator.ValueAggregator;
import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
import
org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
@@ -132,6 +133,7 @@ public final class TableConfigUtils {
// this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use
KinesisConfig.STREAM_TYPE directly, we
// hardcode the value here to avoid pulling the entire pinot-kinesis module
as dependency.
private static final String KINESIS_STREAM_TYPE = "kinesis";
+ private static final String CONSUMING_SEGMENT_TIER = "consuming";
private static final Set<String> UPSERT_DEDUP_ALLOWED_ROUTING_STRATEGIES =
ImmutableSet.of(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
@@ -170,6 +172,21 @@ public final class TableConfigUtils {
public static void validate(TableConfig tableConfig, Schema schema,
@Nullable String typesToSkip) {
Preconditions.checkArgument(schema != null, "Schema should not be null for
table: %s", tableConfig.getTableName());
Set<ValidationType> skipTypes = parseTypesToSkipString(typesToSkip);
+ validateEffectiveTableConfig(tableConfig, schema, skipTypes);
+ if (skipTypes.contains(ValidationType.ALL) ||
!hasConsumingSegmentTierOverwriteForRealtimeTable(tableConfig)) {
+ return;
+ }
+ try {
+ TableConfig consumingTableConfig =
overwriteTableConfigForConsumingSegmentTier(tableConfig);
+ validateEffectiveTableConfig(consumingTableConfig, schema, skipTypes);
+ } catch (RuntimeException e) {
+ throw new IllegalStateException(
+ "tierOverwrites.consuming produces an invalid table config: " +
e.getMessage(), e);
+ }
+ }
+
+ private static void validateEffectiveTableConfig(TableConfig tableConfig,
Schema schema,
+ Set<ValidationType> skipTypes) {
// Sanitize the table config before validation
sanitize(tableConfig);
@@ -2171,6 +2188,32 @@ public final class TableConfigUtils {
}
}
+ private static TableConfig
overwriteTableConfigForConsumingSegmentTier(TableConfig tableConfig) {
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+ if (indexingConfig != null) {
+ JsonNode tierOverwrite =
getTierOverwrite(indexingConfig.getTierOverwrites(), CONSUMING_SEGMENT_TIER);
+ if (tierOverwrite != null) {
+ Preconditions.checkState(tierOverwrite.isObject(),
+ "tableIndexConfig.tierOverwrites.%s must be a JSON object",
CONSUMING_SEGMENT_TIER);
+ }
+ }
+ List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+ if (fieldConfigList != null) {
+ for (FieldConfig fieldConfig : fieldConfigList) {
+ JsonNode tierOverwrite =
getTierOverwrite(fieldConfig.getTierOverwrites(), CONSUMING_SEGMENT_TIER);
+ if (tierOverwrite != null) {
+ Preconditions.checkState(tierOverwrite.isObject(),
+ "fieldConfigList[%s].tierOverwrites.%s must be a JSON object",
fieldConfig.getName(),
+ CONSUMING_SEGMENT_TIER);
+ }
+ }
+ }
+ TableConfig overwrittenTableConfig =
overwriteTableConfigForTier(tableConfig, CONSUMING_SEGMENT_TIER);
+ Preconditions.checkState(overwrittenTableConfig != tableConfig,
+ "Failed to apply tierOverwrites.consuming for table: %s",
tableConfig.getTableName());
+ return overwrittenTableConfig;
+ }
+
private static IndexingConfig applyIndexingConfigTierOverride(IndexingConfig
original, String tier)
throws IOException {
JsonNode tierOverwrites = original.getTierOverwrites();
@@ -2225,6 +2268,54 @@ public final class TableConfigUtils {
return JsonUtils.jsonNodeToObject(merged, FieldConfig.class);
}
+ private static boolean
hasConsumingSegmentTierOverwriteForRealtimeTable(TableConfig tableConfig) {
+ if (tableConfig.getTableType() != TableType.REALTIME
+ || (CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())
+ && tableConfig.getTierConfigsList().stream()
+ .anyMatch(tierConfig ->
CONSUMING_SEGMENT_TIER.equals(tierConfig.getName())))) {
+ return false;
+ }
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+ JsonNode tierOverwrite =
+ indexingConfig != null ?
getTierOverwrite(indexingConfig.getTierOverwrites(), CONSUMING_SEGMENT_TIER) :
null;
+ if (tierOverwrite != null && !tierOverwrite.isNull()) {
+ return true;
+ }
+ List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+ if (fieldConfigList == null) {
+ return false;
+ }
+ for (FieldConfig fieldConfig : fieldConfigList) {
+ tierOverwrite = getTierOverwrite(fieldConfig.getTierOverwrites(),
CONSUMING_SEGMENT_TIER);
+ if (tierOverwrite != null && !tierOverwrite.isNull()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /// Builds the [IndexLoadingConfig] for a mutable consuming segment. If the
realtime table config contains
+ /// `tierOverwrites.consuming` under `tableIndexConfig` or a
`fieldConfigList` entry, the index loading config is
+ /// created from the existing tier-overwrite view for that mutable consuming
segment.
+ /// Tier-overwrite validation applies the override first and validates the
resulting effective table config with the
+ /// normal validation path. Persisted segment commit and immutable segment
load continue to use the base table config.
+ /// Real storage-tier overrides are keyed by configured tier names.
+ /// The original [IndexLoadingConfig] is left untouched so the commit path
and immutable segment load path continue to
+ /// use the persisted table config and real segment tier.
+ public static IndexLoadingConfig
buildConsumingSegmentIndexLoadingConfig(TableConfig tableConfig, Schema schema,
+ IndexLoadingConfig indexLoadingConfig) {
+ if (!hasConsumingSegmentTierOverwriteForRealtimeTable(tableConfig)) {
+ return indexLoadingConfig;
+ }
+ TableConfig consumingTableConfig =
overwriteTableConfigForConsumingSegmentTier(tableConfig);
+ return new
IndexLoadingConfig(indexLoadingConfig.getInstanceDataManagerConfig(),
consumingTableConfig, schema);
+ }
+
+ @Nullable
+ private static JsonNode getTierOverwrite(@Nullable JsonNode tierOverwrites,
String tier) {
+ return tierOverwrites != null && tierOverwrites.isObject() ?
tierOverwrites.get(tier) : null;
+ }
+
/**
* Get the partition column from tableConfig instance assignment config map.
* @param tableConfig table config
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingSegmentTierOverrideTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingSegmentTierOverrideTest.java
new file mode 100644
index 00000000000..7892d784664
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingSegmentTierOverrideTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.tier.TierFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TierConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+/// Tests for `tierOverwrites.consuming` while constructing mutable realtime
consuming segments.
+public class TableConfigConsumingSegmentTierOverrideTest {
+ private static final String TABLE_NAME = "consumingTierOverrideTest";
+ private static final String PROFILED_COLUMN = "profiledString";
+ private static final String CONTROL_COLUMN = "controlString";
+ private static final String TIME_COLUMN = "tsMillis";
+ private static final String CONSUMING_TIER = "consuming";
+
+ @Test
+ public void consumingTierOverrideBuildsMutableViewOnly()
+ throws Exception {
+ Schema schema = schemaWithTime();
+ TableConfig tableConfig = tableWithConsumingOverride();
+ IndexLoadingConfig persistedConfig = new IndexLoadingConfig(tableConfig,
schema);
+
+ FieldIndexConfigs persistedColumnConfig =
persistedConfig.getFieldIndexConfig(PROFILED_COLUMN);
+ assertColumnIndexes(persistedColumnConfig, false, false);
+
assertNull(tableConfig.getIndexingConfig().getMultiColumnTextIndexConfig());
+
+ IndexLoadingConfig consumingIndexLoadingConfig =
+ TableConfigUtils.buildConsumingSegmentIndexLoadingConfig(tableConfig,
schema, persistedConfig);
+ RealtimeSegmentConfig consumingConfig = new
RealtimeSegmentConfig.Builder(consumingIndexLoadingConfig)
+
.setTextIndexConfig(consumingIndexLoadingConfig.getMultiColTextIndexConfig())
+ .build();
+ FieldIndexConfigs consumingColumnConfig =
consumingConfig.getIndexConfigByCol().get(PROFILED_COLUMN);
+ assertColumnIndexes(consumingColumnConfig, true, true);
+
+ FieldIndexConfigs controlColumnConfig =
consumingConfig.getIndexConfigByCol().get(CONTROL_COLUMN);
+ assertColumnIndexes(controlColumnConfig, true, false);
+ assertEquals(consumingConfig.getMultiColIndexConfig().getColumns(),
List.of(PROFILED_COLUMN));
+
+ assertEquals(tableConfig.getIndexingConfig().getNoDictionaryColumns(),
List.of(PROFILED_COLUMN));
+ assertEquals(tableConfig.getFieldConfigList().get(0).getEncodingType(),
FieldConfig.EncodingType.RAW);
+ }
+
+ @Test
+ public void realConsumingStorageTierDoesNotApplyMutableConsumingOverride()
+ throws Exception {
+ Schema schema = schemaWithTime();
+ TableConfig tableConfig = tableWithRealConsumingStorageTierOverride();
+ TableConfigUtils.validate(tableConfig, schema);
+ IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(tableConfig, schema);
+
+ IndexLoadingConfig consumingIndexLoadingConfig =
+ TableConfigUtils.buildConsumingSegmentIndexLoadingConfig(tableConfig,
schema, indexLoadingConfig);
+ RealtimeSegmentConfig consumingConfig = new
RealtimeSegmentConfig.Builder(consumingIndexLoadingConfig)
+
.setTextIndexConfig(consumingIndexLoadingConfig.getMultiColTextIndexConfig())
+ .build();
+
+ FieldIndexConfigs profiledColumnConfig =
consumingConfig.getIndexConfigByCol().get(PROFILED_COLUMN);
+ assertColumnIndexes(profiledColumnConfig, false, false);
+
+ IndexLoadingConfig consumingStorageConfig = new
IndexLoadingConfig(tableConfig, schema);
+ consumingStorageConfig.setSegmentTier(CONSUMING_TIER);
+ FieldIndexConfigs storageTierColumnConfig =
consumingStorageConfig.getFieldIndexConfig(PROFILED_COLUMN);
+ assertColumnIndexes(storageTierColumnConfig, true, true);
+ }
+
+ @Test
+ public void
validationRejectsConsumingTierOverwriteViewThatFailsNormalIndexValidation()
+ throws Exception {
+ assertInvalidTierOverwriteView(CONSUMING_TIER,
invalidRealtimeConsumingOverride(), schemaWithTime());
+ }
+
+ private static void assertColumnIndexes(FieldIndexConfigs fieldIndexConfigs,
boolean dictionaryEnabled,
+ boolean invertedEnabled) {
+
assertEquals(fieldIndexConfigs.getConfig(StandardIndexes.dictionary()).isEnabled(),
dictionaryEnabled);
+
assertEquals(fieldIndexConfigs.getConfig(StandardIndexes.inverted()).isEnabled(),
invertedEnabled);
+ }
+
+ private static void assertInvalidTierOverwriteView(String tier, TableConfig
tableConfig, Schema schema) {
+ IllegalStateException e = expectThrows(IllegalStateException.class, () ->
TableConfigUtils.validate(tableConfig,
+ schema));
+ assertTrue(e.getMessage().contains("tierOverwrites." + tier),
+ "Expected tier override validation error, got: " + e.getMessage());
+ }
+
+ private static TableConfig tableWithConsumingOverride()
+ throws Exception {
+ FieldConfig profiledFieldConfig =
profiledFieldConfigWithTierOverwrite(CONSUMING_TIER);
+ return new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setNoDictionaryColumns(List.of(PROFILED_COLUMN))
+
.setTierOverwrites(JsonUtils.stringToJsonNode("{\"consuming\":{\"noDictionaryColumns\":[],"
+ + "\"multiColumnTextIndexConfig\":{\"columns\":[\"" +
PROFILED_COLUMN + "\"]}}}"))
+ .setFieldConfigList(List.of(profiledFieldConfig))
+ .build();
+ }
+
+ private static TableConfig tableWithRealConsumingStorageTierOverride()
+ throws Exception {
+ FieldConfig profiledFieldConfig =
profiledFieldConfigWithTierOverwrite(CONSUMING_TIER);
+ return new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setTimeColumnName(TIME_COLUMN)
+ .setStreamConfigs(streamConfigs())
+ .setNoDictionaryColumns(List.of(PROFILED_COLUMN))
+ .setTierConfigList(List.of(consumingTierConfig()))
+
.setTierOverwrites(JsonUtils.stringToJsonNode("{\"consuming\":{\"noDictionaryColumns\":[]}}"))
+ .setFieldConfigList(List.of(profiledFieldConfig))
+ .build();
+ }
+
+ private static Schema schemaWithTime() {
+ return new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .addSingleValueDimension(PROFILED_COLUMN, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(CONTROL_COLUMN, FieldSpec.DataType.STRING)
+ .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .build();
+ }
+
+ private static TableConfig invalidRealtimeConsumingOverride()
+ throws Exception {
+ return new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setTimeColumnName(TIME_COLUMN)
+ .setStreamConfigs(streamConfigs())
+ .setNoDictionaryColumns(List.of(PROFILED_COLUMN))
+
.setFieldConfigList(List.of(profiledFieldConfigWithTierOverwrite(CONSUMING_TIER)))
+ .build();
+ }
+
+ private static TierConfig consumingTierConfig() {
+ return new TierConfig(CONSUMING_TIER,
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", null,
+ TierFactory.PINOT_SERVER_STORAGE_TYPE, "consuming_tag_REALTIME", null,
null);
+ }
+
+ private static FieldConfig profiledFieldConfigWithTierOverwrite(String tier)
+ throws Exception {
+ return new FieldConfig.Builder(PROFILED_COLUMN)
+ .withEncodingType(FieldConfig.EncodingType.RAW)
+ .withTierOverwrites(JsonUtils.stringToJsonNode("{\"" + tier +
"\":{\"encodingType\":\"DICTIONARY\","
+ + "\"indexes\":{\"inverted\":{\"disabled\":false}}}}"))
+ .build();
+ }
+
+ private static Map<String, String> streamConfigs() {
+ return Map.of("streamType", "kafka", "stream.kafka.topic.name", "test",
"stream.kafka.decoder.class.name",
+ "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index e56e1149da9..247b1f973df 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1104,6 +1104,17 @@ public class TableConfigUtilsTest {
// expected
}
+ // `consuming` is reserved for the synthetic lifecycle tier, and still
validates as a configured storage tier name
+ // for backward compatibility.
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setTimeColumnName(TIME_COLUMN)
+ .setStreamConfigs(getStreamConfigs())
+ .setTierConfigList(Lists.newArrayList(
+ new TierConfig("consuming",
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", null,
+ TierFactory.PINOT_SERVER_STORAGE_TYPE,
"consuming_tag_REALTIME", null, null)))
+ .build();
+ TableConfigUtils.validate(tableConfig, schema);
+
// fixedSegmentSelector
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setTierConfigList(Lists.newArrayList(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]