This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e2335de3c3 Support consuming tier overrides for realtime segments 
(#18480)
9e2335de3c3 is described below

commit 9e2335de3c3f252eef1147b7af7f5e1d16fde403
Author: Xiang Fu <[email protected]>
AuthorDate: Tue Jun 2 20:07:51 2026 -0700

    Support consuming tier overrides for realtime segments (#18480)
    
    Reuse the existing tierOverwrites mechanism only for mutable realtime 
consuming segments. A realtime table opts in by defining 
tierOverwrites.consuming, so no separate tableIndexConfig feature flag is 
needed.
    
    Keep generic storage-tier override behavior tied to actual segment tier 
metadata. If tierConfigs declares a real storage tier named consuming, that 
name keeps normal storage-tier semantics for backward compatibility.
    
    Apply the consuming override first, then validate the effective consuming 
table config through the normal validation path. Persisted segment commit and 
immutable segment loading continue to use the base table config unless normal 
storage-tier metadata applies during segment loading.
    
    Add consuming-specific guards for row-shape settings, partitioning, null 
handling, aggregation dictionary layout, and mutable text-index reuse because 
those settings must stay aligned with the base config used for persisted 
segment commit.
    
    Add focused unit coverage and an end-to-end realtime integration test 
covering consuming dictionary and inverted index overrides with RAW and no 
inverted index on committed segments.
---
 .../realtime/RealtimeSegmentDataManager.java       |  12 +-
 .../ConsumingSegmentTierOverrideRealtimeTest.java  | 272 +++++++++++++++++++++
 .../CustomDataQueryClusterIntegrationTest.java     |   6 +
 .../segment/local/utils/TableConfigUtils.java      |  91 +++++++
 ...ableConfigConsumingSegmentTierOverrideTest.java | 181 ++++++++++++++
 .../segment/local/utils/TableConfigUtilsTest.java  |  11 +
 6 files changed, 570 insertions(+), 3 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index fa423c7d081..3b0d9d72202 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -69,6 +69,7 @@ import 
org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.segment.local.upsert.UpsertContext;
 import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
@@ -1821,9 +1822,14 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     _isOffHeap = indexLoadingConfig.isRealtimeOffHeapAllocation();
     _defaultNullHandlingEnabled = indexingConfig.isNullHandlingEnabled();
 
-    // Start new realtime segment
+    // Start new realtime segment. Use a mutable-only tier-overwritten index 
loading view when consuming tier overrides
+    // are configured; committed segments continue to use the base table 
config.
     String consumerDir = realtimeTableDataManager.getConsumerDir();
-    RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new 
RealtimeSegmentConfig.Builder(indexLoadingConfig)
+    IndexLoadingConfig consumingIndexLoadingConfig =
+        TableConfigUtils.buildConsumingSegmentIndexLoadingConfig(_tableConfig, 
_schema, indexLoadingConfig);
+    RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
+        new RealtimeSegmentConfig.Builder(consumingIndexLoadingConfig);
+    realtimeSegmentConfigBuilder
         .setTableNameWithType(_tableNameWithType)
         .setSegmentName(_segmentNameStr)
         .setStreamName(streamTopic)
@@ -1841,7 +1847,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
         .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
         .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
         .setConsumerDir(consumerDir)
-        
.setTextIndexConfig(_tableConfig.getIndexingConfig().getMultiColumnTextIndexConfig())
+        
.setTextIndexConfig(consumingIndexLoadingConfig.getMultiColTextIndexConfig())
         .setDropRecordOnPartitionMismatch(ingestionConfig != null
             && ingestionConfig.getStreamIngestionConfig() != null
             && 
ingestionConfig.getStreamIngestionConfig().isDropRecordOnPartitionMismatch());
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingSegmentTierOverrideRealtimeTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingSegmentTierOverrideRealtimeTest.java
new file mode 100644
index 00000000000..5770b58abcf
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingSegmentTierOverrideRealtimeTest.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/// End-to-end coverage for the synthetic `consuming` tier override on a 
realtime table.
+///
+/// The table persists `profiledString` as RAW with no inverted index. 
`tierOverwrites.consuming` upgrades that column
+/// to DICTIONARY + INVERTED only while the segment is mutable and consuming. 
After force-commit, the immutable segment
+/// must retain the persisted RAW/no-inverted shape on disk.
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class ConsumingSegmentTierOverrideRealtimeTest extends 
CustomDataQueryClusterIntegrationTest {
+  private static final String TABLE_NAME = 
"ConsumingSegmentTierOverrideRealtimeTest";
+  private static final int NUM_DOCS = 200;
+  private static final String PROFILED_STRING_COLUMN = "profiledString";
+  private static final String TIME_COLUMN = "tsMillis";
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public boolean isRealtimeTable() {
+    return true;
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return NUM_DOCS;
+  }
+
+  @Override
+  protected int getRealtimeSegmentFlushSize() {
+    return NUM_DOCS * 1000;
+  }
+
+  @Override
+  protected Map<String, String> getStreamConfigMap() {
+    Map<String, String> streamConfigMap = new 
HashMap<>(super.getStreamConfigMap());
+    streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, 
"365d");
+    return streamConfigMap;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(getTableName())
+        .addSingleValueDimension(PROFILED_STRING_COLUMN, 
FieldSpec.DataType.STRING)
+        .addDateTimeField(TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .build();
+  }
+
+  @Override
+  public String getTimeColumnName() {
+    return TIME_COLUMN;
+  }
+
+  @Override
+  protected String getSortedColumn() {
+    return null;
+  }
+
+  @Override
+  protected List<String> getNoDictionaryColumns() {
+    return List.of(PROFILED_STRING_COLUMN);
+  }
+
+  @Override
+  protected List<FieldConfig> getFieldConfigs() {
+    return List.of(new FieldConfig.Builder(PROFILED_STRING_COLUMN)
+        .withEncodingType(FieldConfig.EncodingType.RAW)
+        
.withTierOverwrites(jsonNode("{\"consuming\":{\"encodingType\":\"DICTIONARY\","
+            + "\"indexes\":{\"inverted\":{\"disabled\":false}}}}"))
+        .build());
+  }
+
+  @Override
+  protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
+    AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
+    return getTableConfigBuilder(TableType.REALTIME)
+        
.setTierOverwrites(jsonNode("{\"consuming\":{\"noDictionaryColumns\":[]}}"))
+        .build();
+  }
+
+  @Override
+  public List<File> createAvroFiles()
+      throws Exception {
+    org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+    org.apache.avro.Schema stringSchema = 
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING);
+    org.apache.avro.Schema longSchema = 
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG);
+    avroSchema.setFields(List.of(
+        new org.apache.avro.Schema.Field(PROFILED_STRING_COLUMN, stringSchema, 
null, null),
+        new org.apache.avro.Schema.Field(TIME_COLUMN, longSchema, null, 
null)));
+
+    long now = System.currentTimeMillis();
+    try (AvroFilesAndWriters avroFilesAndWriters = 
createAvroFilesAndWriters(avroSchema)) {
+      List<DataFileWriter<GenericData.Record>> writers = 
avroFilesAndWriters.getWriters();
+      for (int i = 0; i < NUM_DOCS; i++) {
+        GenericData.Record record = new GenericData.Record(avroSchema);
+        record.put(PROFILED_STRING_COLUMN, "value-" + i % 4);
+        record.put(TIME_COLUMN, now + i);
+        writers.get(i % getNumAvroFiles()).append(record);
+      }
+      return avroFilesAndWriters.getAvroFiles();
+    }
+  }
+
+  @Test
+  public void testConsumingSegmentTierOverrideEndToEnd()
+      throws Exception {
+    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+
+    int consumingSegmentsInspected = 
inspectConsumingSegments(realtimeTableName);
+    assertTrue(consumingSegmentsInspected > 0,
+        "Expected at least one consuming segment before forceCommit, got " + 
consumingSegmentsInspected);
+
+    forceCommitAndWait(realtimeTableName);
+  }
+
+  private int inspectConsumingSegments(String realtimeTableName)
+      throws Exception {
+    return inspectSegments(realtimeTableName, MutableSegmentImpl.class, 
segment -> {
+      DataSource profiled = segment.getDataSource(PROFILED_STRING_COLUMN);
+      assertNotNull(profiled.getDictionary(),
+          PROFILED_STRING_COLUMN + " must have a consuming-segment 
dictionary");
+      assertNotNull(profiled.getInvertedIndex(),
+          PROFILED_STRING_COLUMN + " must have a consuming-segment inverted 
index");
+    });
+  }
+
+  private int inspectImmutableSegments(String realtimeTableName)
+      throws Exception {
+    return inspectSegments(realtimeTableName, ImmutableSegmentImpl.class, 
segment -> {
+      SegmentMetadata metadata = segment.getSegmentMetadata();
+      ColumnMetadata profiledMetadata = 
metadata.getColumnMetadataFor(PROFILED_STRING_COLUMN);
+      assertNotNull(profiledMetadata, "Missing metadata for " + 
PROFILED_STRING_COLUMN);
+      assertFalse(profiledMetadata.hasDictionary(),
+          PROFILED_STRING_COLUMN + " must remain RAW/no-dictionary after 
commit");
+      assertEquals(profiledMetadata.getForwardIndexEncoding(), 
FieldConfig.EncodingType.RAW,
+          PROFILED_STRING_COLUMN + " must persist with RAW forward-index 
encoding after commit");
+
+      try (SegmentDirectory directory = new 
SegmentLocalFSDirectory(metadata.getIndexDir(), ReadMode.mmap);
+          SegmentDirectory.Reader reader = directory.createReader()) {
+        assertTrue(reader.hasIndexFor(PROFILED_STRING_COLUMN, 
StandardIndexes.forward()),
+            PROFILED_STRING_COLUMN + " RAW forward index must be persisted");
+        assertFalse(reader.hasIndexFor(PROFILED_STRING_COLUMN, 
StandardIndexes.dictionary()),
+            PROFILED_STRING_COLUMN + " dictionary must not be persisted");
+        assertFalse(reader.hasIndexFor(PROFILED_STRING_COLUMN, 
StandardIndexes.inverted()),
+            PROFILED_STRING_COLUMN + " inverted index must not be persisted");
+      }
+    });
+  }
+
+  private int inspectSegments(String realtimeTableName, Class<?> segmentType, 
SegmentAssertion assertion)
+      throws Exception {
+    int inspected = 0;
+    for (BaseServerStarter serverStarter : getSharedServerStarters()) {
+      TableDataManager tableDataManager = 
serverStarter.getServerInstance().getInstanceDataManager()
+          .getTableDataManager(realtimeTableName);
+      assertNotNull(tableDataManager, "Missing table data manager on server");
+      List<SegmentDataManager> segmentDataManagers = 
tableDataManager.acquireAllSegments();
+      try {
+        for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+          IndexSegment segment = segmentDataManager.getSegment();
+          if (!segmentType.isInstance(segment)) {
+            continue;
+          }
+          assertion.accept(segment);
+          inspected++;
+        }
+      } finally {
+        for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+          tableDataManager.releaseSegment(segmentDataManager);
+        }
+      }
+    }
+    return inspected;
+  }
+
+  private void forceCommitAndWait(String realtimeTableName)
+      throws Exception {
+    String response = 
getOrCreateAdminClient().getTableClient().forceCommit(realtimeTableName);
+    String jobId = 
JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();
+
+    TestUtils.waitForCondition(aVoid -> isForceCommitComplete(jobId), 120_000L,
+        "Timed out waiting for forceCommit job: " + jobId);
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        return inspectImmutableSegments(realtimeTableName) > 0;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }, 120_000L, "Timed out waiting for force-committed segments to load as 
immutable");
+  }
+
+  private static JsonNode jsonNode(String json) {
+    try {
+      return JsonUtils.stringToJsonNode(json);
+    } catch (IOException e) {
+      throw new IllegalStateException("Invalid test JSON: " + json, e);
+    }
+  }
+
+  private boolean isForceCommitComplete(String jobId) {
+    try {
+      String jobStatusResponse = 
getOrCreateAdminClient().getTableClient().getForceCommitJobStatus(jobId);
+      JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse);
+      return 
jobStatus.get(CommonConstants.ControllerJob.NUM_CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED).asInt(-1)
 == 0;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private interface SegmentAssertion {
+    void accept(IndexSegment segment)
+        throws Exception;
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
index 865e87b3bb9..5a09cb3f9ff 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
@@ -46,6 +46,7 @@ import 
org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
 import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
 import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
@@ -305,6 +306,11 @@ public abstract class 
CustomDataQueryClusterIntegrationTest extends BaseClusterI
     return _sharedClusterTestSuite._controllerStarter;
   }
 
+  /// Returns server starters from the shared suite instance.
+  protected List<BaseServerStarter> getSharedServerStarters() {
+    return _sharedClusterTestSuite._serverStarters;
+  }
+
   /**
    * Returns the property store from the shared suite instance.
    */
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 5f87017a084..5d8c1919e6a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -47,6 +47,7 @@ import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.segment.local.aggregator.ValueAggregator;
 import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
 import 
org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
@@ -132,6 +133,7 @@ public final class TableConfigUtils {
   // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use 
KinesisConfig.STREAM_TYPE directly, we
   // hardcode the value here to avoid pulling the entire pinot-kinesis module 
as dependency.
   private static final String KINESIS_STREAM_TYPE = "kinesis";
+  private static final String CONSUMING_SEGMENT_TIER = "consuming";
 
   private static final Set<String> UPSERT_DEDUP_ALLOWED_ROUTING_STRATEGIES =
       
ImmutableSet.of(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
@@ -170,6 +172,21 @@ public final class TableConfigUtils {
   public static void validate(TableConfig tableConfig, Schema schema, 
@Nullable String typesToSkip) {
     Preconditions.checkArgument(schema != null, "Schema should not be null for 
table: %s", tableConfig.getTableName());
     Set<ValidationType> skipTypes = parseTypesToSkipString(typesToSkip);
+    validateEffectiveTableConfig(tableConfig, schema, skipTypes);
+    if (skipTypes.contains(ValidationType.ALL) || 
!hasConsumingSegmentTierOverwriteForRealtimeTable(tableConfig)) {
+      return;
+    }
+    try {
+      TableConfig consumingTableConfig = 
overwriteTableConfigForConsumingSegmentTier(tableConfig);
+      validateEffectiveTableConfig(consumingTableConfig, schema, skipTypes);
+    } catch (RuntimeException e) {
+      throw new IllegalStateException(
+          "tierOverwrites.consuming produces an invalid table config: " + 
e.getMessage(), e);
+    }
+  }
+
+  private static void validateEffectiveTableConfig(TableConfig tableConfig, 
Schema schema,
+      Set<ValidationType> skipTypes) {
     // Sanitize the table config before validation
     sanitize(tableConfig);
 
@@ -2171,6 +2188,32 @@ public final class TableConfigUtils {
     }
   }
 
+  private static TableConfig 
overwriteTableConfigForConsumingSegmentTier(TableConfig tableConfig) {
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    if (indexingConfig != null) {
+      JsonNode tierOverwrite = 
getTierOverwrite(indexingConfig.getTierOverwrites(), CONSUMING_SEGMENT_TIER);
+      if (tierOverwrite != null) {
+        Preconditions.checkState(tierOverwrite.isObject(),
+            "tableIndexConfig.tierOverwrites.%s must be a JSON object", 
CONSUMING_SEGMENT_TIER);
+      }
+    }
+    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+    if (fieldConfigList != null) {
+      for (FieldConfig fieldConfig : fieldConfigList) {
+        JsonNode tierOverwrite = 
getTierOverwrite(fieldConfig.getTierOverwrites(), CONSUMING_SEGMENT_TIER);
+        if (tierOverwrite != null) {
+          Preconditions.checkState(tierOverwrite.isObject(),
+              "fieldConfigList[%s].tierOverwrites.%s must be a JSON object", 
fieldConfig.getName(),
+              CONSUMING_SEGMENT_TIER);
+        }
+      }
+    }
+    TableConfig overwrittenTableConfig = 
overwriteTableConfigForTier(tableConfig, CONSUMING_SEGMENT_TIER);
+    Preconditions.checkState(overwrittenTableConfig != tableConfig,
+        "Failed to apply tierOverwrites.consuming for table: %s", 
tableConfig.getTableName());
+    return overwrittenTableConfig;
+  }
+
   private static IndexingConfig applyIndexingConfigTierOverride(IndexingConfig 
original, String tier)
       throws IOException {
     JsonNode tierOverwrites = original.getTierOverwrites();
@@ -2225,6 +2268,54 @@ public final class TableConfigUtils {
     return JsonUtils.jsonNodeToObject(merged, FieldConfig.class);
   }
 
+  private static boolean 
hasConsumingSegmentTierOverwriteForRealtimeTable(TableConfig tableConfig) {
+    if (tableConfig.getTableType() != TableType.REALTIME
+        || (CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())
+            && tableConfig.getTierConfigsList().stream()
+                .anyMatch(tierConfig -> 
CONSUMING_SEGMENT_TIER.equals(tierConfig.getName())))) {
+      return false;
+    }
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    JsonNode tierOverwrite =
+        indexingConfig != null ? 
getTierOverwrite(indexingConfig.getTierOverwrites(), CONSUMING_SEGMENT_TIER) : 
null;
+    if (tierOverwrite != null && !tierOverwrite.isNull()) {
+      return true;
+    }
+    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+    if (fieldConfigList == null) {
+      return false;
+    }
+    for (FieldConfig fieldConfig : fieldConfigList) {
+      tierOverwrite = getTierOverwrite(fieldConfig.getTierOverwrites(), 
CONSUMING_SEGMENT_TIER);
+      if (tierOverwrite != null && !tierOverwrite.isNull()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /// Builds the [IndexLoadingConfig] for a mutable consuming segment. If the 
realtime table config contains
+  /// `tierOverwrites.consuming` under `tableIndexConfig` or a 
`fieldConfigList` entry, the index loading config is
+  /// created from the existing tier-overwrite view for that mutable consuming 
segment.
+  /// Tier-overwrite validation applies the override first and validates the 
resulting effective table config with the
+  /// normal validation path. Persisted segment commit and immutable segment 
load continue to use the base table config.
+  /// Real storage-tier overrides are keyed by configured tier names.
+  /// The original [IndexLoadingConfig] is left untouched so the commit path 
and immutable segment load path continue to
+  /// use the persisted table config and real segment tier.
+  public static IndexLoadingConfig 
buildConsumingSegmentIndexLoadingConfig(TableConfig tableConfig, Schema schema,
+      IndexLoadingConfig indexLoadingConfig) {
+    if (!hasConsumingSegmentTierOverwriteForRealtimeTable(tableConfig)) {
+      return indexLoadingConfig;
+    }
+    TableConfig consumingTableConfig = 
overwriteTableConfigForConsumingSegmentTier(tableConfig);
+    return new 
IndexLoadingConfig(indexLoadingConfig.getInstanceDataManagerConfig(), 
consumingTableConfig, schema);
+  }
+
+  @Nullable
+  private static JsonNode getTierOverwrite(@Nullable JsonNode tierOverwrites, 
String tier) {
+    return tierOverwrites != null && tierOverwrites.isObject() ? 
tierOverwrites.get(tier) : null;
+  }
+
   /**
    * Get the partition column from tableConfig instance assignment config map.
    * @param tableConfig table config
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingSegmentTierOverrideTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingSegmentTierOverrideTest.java
new file mode 100644
index 00000000000..7892d784664
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingSegmentTierOverrideTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.tier.TierFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TierConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+/// Tests for `tierOverwrites.consuming` while constructing mutable realtime 
consuming segments.
+public class TableConfigConsumingSegmentTierOverrideTest {
+  private static final String TABLE_NAME = "consumingTierOverrideTest";
+  private static final String PROFILED_COLUMN = "profiledString";
+  private static final String CONTROL_COLUMN = "controlString";
+  private static final String TIME_COLUMN = "tsMillis";
+  private static final String CONSUMING_TIER = "consuming";
+
+  @Test
+  public void consumingTierOverrideBuildsMutableViewOnly()
+      throws Exception {
+    Schema schema = schemaWithTime();
+    TableConfig tableConfig = tableWithConsumingOverride();
+    IndexLoadingConfig persistedConfig = new IndexLoadingConfig(tableConfig, 
schema);
+
+    FieldIndexConfigs persistedColumnConfig = 
persistedConfig.getFieldIndexConfig(PROFILED_COLUMN);
+    assertColumnIndexes(persistedColumnConfig, false, false);
+    
assertNull(tableConfig.getIndexingConfig().getMultiColumnTextIndexConfig());
+
+    IndexLoadingConfig consumingIndexLoadingConfig =
+        TableConfigUtils.buildConsumingSegmentIndexLoadingConfig(tableConfig, 
schema, persistedConfig);
+    RealtimeSegmentConfig consumingConfig = new 
RealtimeSegmentConfig.Builder(consumingIndexLoadingConfig)
+        
.setTextIndexConfig(consumingIndexLoadingConfig.getMultiColTextIndexConfig())
+        .build();
+    FieldIndexConfigs consumingColumnConfig = 
consumingConfig.getIndexConfigByCol().get(PROFILED_COLUMN);
+    assertColumnIndexes(consumingColumnConfig, true, true);
+
+    FieldIndexConfigs controlColumnConfig = 
consumingConfig.getIndexConfigByCol().get(CONTROL_COLUMN);
+    assertColumnIndexes(controlColumnConfig, true, false);
+    assertEquals(consumingConfig.getMultiColIndexConfig().getColumns(), 
List.of(PROFILED_COLUMN));
+
+    assertEquals(tableConfig.getIndexingConfig().getNoDictionaryColumns(), 
List.of(PROFILED_COLUMN));
+    assertEquals(tableConfig.getFieldConfigList().get(0).getEncodingType(), 
FieldConfig.EncodingType.RAW);
+  }
+
+  @Test
+  public void realConsumingStorageTierDoesNotApplyMutableConsumingOverride()
+      throws Exception {
+    Schema schema = schemaWithTime();
+    TableConfig tableConfig = tableWithRealConsumingStorageTierOverride();
+    TableConfigUtils.validate(tableConfig, schema);
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, schema);
+
+    IndexLoadingConfig consumingIndexLoadingConfig =
+        TableConfigUtils.buildConsumingSegmentIndexLoadingConfig(tableConfig, 
schema, indexLoadingConfig);
+    RealtimeSegmentConfig consumingConfig = new 
RealtimeSegmentConfig.Builder(consumingIndexLoadingConfig)
+        
.setTextIndexConfig(consumingIndexLoadingConfig.getMultiColTextIndexConfig())
+        .build();
+
+    FieldIndexConfigs profiledColumnConfig = 
consumingConfig.getIndexConfigByCol().get(PROFILED_COLUMN);
+    assertColumnIndexes(profiledColumnConfig, false, false);
+
+    IndexLoadingConfig consumingStorageConfig = new 
IndexLoadingConfig(tableConfig, schema);
+    consumingStorageConfig.setSegmentTier(CONSUMING_TIER);
+    FieldIndexConfigs storageTierColumnConfig = 
consumingStorageConfig.getFieldIndexConfig(PROFILED_COLUMN);
+    assertColumnIndexes(storageTierColumnConfig, true, true);
+  }
+
+  @Test
+  public void 
validationRejectsConsumingTierOverwriteViewThatFailsNormalIndexValidation()
+      throws Exception {
+    assertInvalidTierOverwriteView(CONSUMING_TIER, 
invalidRealtimeConsumingOverride(), schemaWithTime());
+  }
+
+  private static void assertColumnIndexes(FieldIndexConfigs fieldIndexConfigs, 
boolean dictionaryEnabled,
+      boolean invertedEnabled) {
+    
assertEquals(fieldIndexConfigs.getConfig(StandardIndexes.dictionary()).isEnabled(),
 dictionaryEnabled);
+    
assertEquals(fieldIndexConfigs.getConfig(StandardIndexes.inverted()).isEnabled(),
 invertedEnabled);
+  }
+
+  private static void assertInvalidTierOverwriteView(String tier, TableConfig 
tableConfig, Schema schema) {
+    IllegalStateException e = expectThrows(IllegalStateException.class, () -> 
TableConfigUtils.validate(tableConfig,
+        schema));
+    assertTrue(e.getMessage().contains("tierOverwrites." + tier),
+        "Expected tier override validation error, got: " + e.getMessage());
+  }
+
+  private static TableConfig tableWithConsumingOverride()
+      throws Exception {
+    FieldConfig profiledFieldConfig = 
profiledFieldConfigWithTierOverwrite(CONSUMING_TIER);
+    return new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setNoDictionaryColumns(List.of(PROFILED_COLUMN))
+        
.setTierOverwrites(JsonUtils.stringToJsonNode("{\"consuming\":{\"noDictionaryColumns\":[],"
+            + "\"multiColumnTextIndexConfig\":{\"columns\":[\"" + 
PROFILED_COLUMN + "\"]}}}"))
+        .setFieldConfigList(List.of(profiledFieldConfig))
+        .build();
+  }
+
+  private static TableConfig tableWithRealConsumingStorageTierOverride()
+      throws Exception {
+    FieldConfig profiledFieldConfig = 
profiledFieldConfigWithTierOverwrite(CONSUMING_TIER);
+    return new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setTimeColumnName(TIME_COLUMN)
+        .setStreamConfigs(streamConfigs())
+        .setNoDictionaryColumns(List.of(PROFILED_COLUMN))
+        .setTierConfigList(List.of(consumingTierConfig()))
+        
.setTierOverwrites(JsonUtils.stringToJsonNode("{\"consuming\":{\"noDictionaryColumns\":[]}}"))
+        .setFieldConfigList(List.of(profiledFieldConfig))
+        .build();
+  }
+
+  private static Schema schemaWithTime() {
+    return new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(PROFILED_COLUMN, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(CONTROL_COLUMN, FieldSpec.DataType.STRING)
+        .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .build();
+  }
+
+  private static TableConfig invalidRealtimeConsumingOverride()
+      throws Exception {
+    return new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setTimeColumnName(TIME_COLUMN)
+        .setStreamConfigs(streamConfigs())
+        .setNoDictionaryColumns(List.of(PROFILED_COLUMN))
+        
.setFieldConfigList(List.of(profiledFieldConfigWithTierOverwrite(CONSUMING_TIER)))
+        .build();
+  }
+
+  private static TierConfig consumingTierConfig() {
+    return new TierConfig(CONSUMING_TIER, 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", null,
+        TierFactory.PINOT_SERVER_STORAGE_TYPE, "consuming_tag_REALTIME", null, 
null);
+  }
+
+  private static FieldConfig profiledFieldConfigWithTierOverwrite(String tier)
+      throws Exception {
+    return new FieldConfig.Builder(PROFILED_COLUMN)
+        .withEncodingType(FieldConfig.EncodingType.RAW)
+        .withTierOverwrites(JsonUtils.stringToJsonNode("{\"" + tier + 
"\":{\"encodingType\":\"DICTIONARY\","
+            + "\"indexes\":{\"inverted\":{\"disabled\":false}}}}"))
+        .build();
+  }
+
+  private static Map<String, String> streamConfigs() {
+    return Map.of("streamType", "kafka", "stream.kafka.topic.name", "test", 
"stream.kafka.decoder.class.name",
+        "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index e56e1149da9..247b1f973df 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1104,6 +1104,17 @@ public class TableConfigUtilsTest {
       // expected
     }
 
+    // `consuming` is reserved for the synthetic lifecycle tier, and still 
validates as a configured storage tier name
+    // for backward compatibility.
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setTimeColumnName(TIME_COLUMN)
+        .setStreamConfigs(getStreamConfigs())
+        .setTierConfigList(Lists.newArrayList(
+            new TierConfig("consuming", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", null,
+                TierFactory.PINOT_SERVER_STORAGE_TYPE, 
"consuming_tag_REALTIME", null, null)))
+        .build();
+    TableConfigUtils.validate(tableConfig, schema);
+
     // fixedSegmentSelector
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setTierConfigList(Lists.newArrayList(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to