This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 919f407  Handle the partitioning mismatch between table config and 
stream (#6031)
919f407 is described below

commit 919f40764b1057e80be1e96daed8af929601360f
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Sat Sep 19 11:56:44 2020 -0700

    Handle the partitioning mismatch between table config and stream (#6031)
    
    In consuming segment, update the partition info when ingesting new records. 
Log a warning and emit a metric **REALTIME_PARTITION_MISMATCH** when the 
partition is not aligned with the stream partition. The updated partition info 
will be persisted in the segment metadata, and when the segment is committed, 
also update the partition info stored in the segment ZK metadata.
    
    Added `SegmentPartitionLLCRealtimeClusterIntegrationTest` to test the 
expected behavior.
    
    NOTE: With the fix, the consuming segment can still be pruned out 
incorrectly if the partition info in the table config does not align with the 
stream. To fix that, we can only persist the partition info for the completed 
segments, but not the consuming segments. Need some perf test to verify the 
performance penalty.
---
 .../apache/pinot/common/metrics/ServerMeter.java   |   1 +
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  31 +++-
 .../realtime/HLRealtimeSegmentDataManager.java     |   8 +-
 .../realtime/LLRealtimeSegmentDataManager.java     |  34 ++--
 .../indexsegment/mutable/MutableSegmentImpl.java   |  50 ++++--
 .../core/realtime/impl/RealtimeSegmentConfig.java  |  22 ++-
 .../index/datasource/MutableDataSource.java        |  10 +-
 .../mutable/MutableSegmentImplTestUtils.java       |  25 ++-
 ...PartitionLLCRealtimeClusterIntegrationTest.java | 185 +++++++++++++++++++++
 .../realtime/provisioning/MemoryEstimator.java     |  50 +++---
 10 files changed, 340 insertions(+), 76 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index fbdc73e..c680661 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -39,6 +39,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
   REALTIME_OFFSET_COMMITS("commits", true),
   REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
+  REALTIME_PARTITION_MISMATCH("mismatch", false),
   ROWS_WITH_ERRORS("rows", false),
   LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true),
   LLC_CONTROLLER_RESPONSE_COMMIT("messages", true),
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index a06e0fb..43ea74c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -67,6 +67,7 @@ import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegment
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
 import org.apache.pinot.controller.util.SegmentCompletionUtils;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
@@ -512,6 +513,11 @@ public class PinotLLCRealtimeSegmentManager {
     committingSegmentZKMetadata.setIndexVersion(segmentMetadata.getVersion());
     committingSegmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
 
+    // Update the partition metadata based on the segment metadata
+    // NOTE: When the stream partition changes, or the records are not 
properly partitioned from the stream, the
+    //       partition of the segment (based on the actual consumed records) 
can be different from the stream partition.
+    
committingSegmentZKMetadata.setPartitionMetadata(getPartitionMetadataFromSegmentMetadata(segmentMetadata));
+
     persistSegmentZKMetadata(realtimeTableName, committingSegmentZKMetadata, 
stat.getVersion());
     return committingSegmentZKMetadata;
   }
@@ -578,6 +584,25 @@ public class PinotLLCRealtimeSegmentManager {
     return new SegmentPartitionMetadata(partitionMetadataMap);
   }
 
+  @Nullable
+  private SegmentPartitionMetadata 
getPartitionMetadataFromSegmentMetadata(SegmentMetadataImpl segmentMetadata) {
+    Map<String, ColumnPartitionMetadata> partitionMetadataMap = new 
HashMap<>();
+    for (Map.Entry<String, ColumnMetadata> entry : 
segmentMetadata.getColumnMetadataMap().entrySet()) {
+      String columnName = entry.getKey();
+      ColumnMetadata columnMetadata = entry.getValue();
+      if (columnMetadata.getPartitionFunction() != null) {
+        partitionMetadataMap.put(columnName,
+            new 
ColumnPartitionMetadata(columnMetadata.getPartitionFunction().toString(),
+                columnMetadata.getNumPartitions(), 
columnMetadata.getPartitions()));
+      }
+    }
+    if (!partitionMetadataMap.isEmpty()) {
+      return new SegmentPartitionMetadata(partitionMetadataMap);
+    } else {
+      return null;
+    }
+  }
+
   public long getCommitTimeoutMS(String realtimeTableName) {
     long commitTimeoutMS = 
SegmentCompletionProtocol.getMaxSegmentCommitTimeMs();
     if (_propertyStore == null) {
@@ -603,7 +628,8 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   @VisibleForTesting
-  StreamPartitionMsgOffset getPartitionOffset(StreamConfig streamConfig, 
OffsetCriteria offsetCriteria, int partitionId) {
+  StreamPartitionMsgOffset getPartitionOffset(StreamConfig streamConfig, 
OffsetCriteria offsetCriteria,
+      int partitionId) {
     PartitionOffsetFetcher partitionOffsetFetcher =
         new PartitionOffsetFetcher(offsetCriteria, partitionId, streamConfig);
     try {
@@ -1011,7 +1037,8 @@ public class PinotLLCRealtimeSegmentManager {
     LLCSegmentName newLLCSegmentName =
         new LLCSegmentName(rawTableName, partitionId, 
STARTING_SEQUENCE_NUMBER, creationTimeMs);
     String newSegmentName = newLLCSegmentName.getSegmentName();
-    StreamPartitionMsgOffset startOffset = getPartitionOffset(streamConfig, 
streamConfig.getOffsetCriteria(), partitionId);
+    StreamPartitionMsgOffset startOffset =
+        getPartitionOffset(streamConfig, streamConfig.getOffsetCriteria(), 
partitionId);
     CommittingSegmentDescriptor committingSegmentDescriptor =
         new CommittingSegmentDescriptor(null, startOffset.toString(), 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, 
creationTimeMs,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 74aea94..311d2a7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -185,9 +185,9 @@ public class HLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     _segmentLogger.info("Started {} stream provider", _streamConfig.getType());
     final int capacity = _streamConfig.getFlushThresholdRows();
     RealtimeSegmentConfig realtimeSegmentConfig =
-        new 
RealtimeSegmentConfig.Builder().setSegmentName(_segmentName).setStreamName(_streamConfig.getTopicName())
-            
.setSchema(schema).setTimeColumnName(_timeColumnName).setCapacity(capacity)
-            
.setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
+        new 
RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType).setSegmentName(_segmentName)
+            
.setStreamName(_streamConfig.getTopicName()).setSchema(schema).setTimeColumnName(_timeColumnName)
+            
.setCapacity(capacity).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
             
.setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
             
.setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
             
.setInvertedIndexColumns(invertedIndexColumns).setRealtimeSegmentZKMetadata(realtimeSegmentZKMetadata)
@@ -197,7 +197,7 @@ public class HLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
                 indexLoadingConfig.isDirectRealtimeOffHeapAllocation(), 
serverMetrics))
             .setStatsHistory(realtimeTableDataManager.getStatsHistory())
             
.setNullHandlingEnabled(indexingConfig.isNullHandlingEnabled()).build();
-    _realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfig);
+    _realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfig, 
serverMetrics);
 
     _notifier = realtimeTableDataManager;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index fc43ba2..1363381 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -364,7 +364,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     final long idlePipeSleepTimeMillis = 100;
     final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / 
(idlePipeSleepTimeMillis + _partitionLevelStreamConfig
         .getFetchTimeoutMillis());  // 3 minute count
-    StreamPartitionMsgOffset lastUpdatedOffset = 
_streamPartitionMsgOffsetFactory.create(_currentOffset);  // so that we always 
update the metric when we enter this method.
+    StreamPartitionMsgOffset lastUpdatedOffset = 
_streamPartitionMsgOffsetFactory
+        .create(_currentOffset);  // so that we always update the metric when 
we enter this method.
     long consecutiveIdleCount = 0;
     // At this point, we know that we can potentially move the offset, so the 
old saved segment file is not valid
     // anymore. Remove the file if it exists.
@@ -473,7 +474,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
               GenericRow transformedRow = 
_recordTransformer.transform((GenericRow) singleRow);
               if (transformedRow != null && 
IngestionUtils.shouldIngestRow(transformedRow)) {
                 realtimeRowsConsumedMeter = _serverMetrics
-                    .addMeteredTableValue(_metricKeyName, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter);
+                    .addMeteredTableValue(_metricKeyName, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1,
+                        realtimeRowsConsumedMeter);
                 indexedMessageCount++;
                 canTakeMore = _realtimeSegment.index(transformedRow, 
msgMetadata);
               } else {
@@ -486,7 +488,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
             GenericRow transformedRow = 
_recordTransformer.transform(decodedRow);
             if (transformedRow != null && 
IngestionUtils.shouldIngestRow(transformedRow)) {
               realtimeRowsConsumedMeter = _serverMetrics
-                  .addMeteredTableValue(_metricKeyName, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter);
+                  .addMeteredTableValue(_metricKeyName, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1,
+                      realtimeRowsConsumedMeter);
               indexedMessageCount++;
               canTakeMore = _realtimeSegment.index(transformedRow, 
msgMetadata);
             } else {
@@ -901,8 +904,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   protected void postStopConsumedMsg(String reason) {
     do {
       SegmentCompletionProtocol.Request.Params params = new 
SegmentCompletionProtocol.Request.Params();
-      
params.withStreamPartitionMsgOffset(_currentOffset.toString()).withReason(reason)
-          .withSegmentName(_segmentNameStr).withInstanceId(_instanceId);
+      
params.withStreamPartitionMsgOffset(_currentOffset.toString()).withReason(reason).withSegmentName(_segmentNameStr)
+          .withInstanceId(_instanceId);
 
       SegmentCompletionProtocol.Response response = 
_protocolHandler.segmentStoppedConsuming(params);
       if (response.getStatus() == 
SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED) {
@@ -919,7 +922,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     // Retry maybe once if leader is not found.
     SegmentCompletionProtocol.Request.Params params = new 
SegmentCompletionProtocol.Request.Params();
     
params.withStreamPartitionMsgOffset(_currentOffset.toString()).withSegmentName(_segmentNameStr)
-        .withReason(_stopReason) 
.withNumRows(_numRowsConsumed).withInstanceId(_instanceId);
+        
.withReason(_stopReason).withNumRows(_numRowsConsumed).withInstanceId(_instanceId);
     if (_isOffHeap) {
       params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
     }
@@ -1165,9 +1168,9 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     // Start new realtime segment
     String consumerDir = realtimeTableDataManager.getConsumerDir();
     RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
-        new 
RealtimeSegmentConfig.Builder().setSegmentName(_segmentNameStr).setStreamName(_streamTopic)
-            
.setSchema(_schema).setTimeColumnName(timeColumnName).setCapacity(_segmentMaxRowCount)
-            
.setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
+        new 
RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType).setSegmentName(_segmentNameStr)
+            
.setStreamName(_streamTopic).setSchema(_schema).setTimeColumnName(timeColumnName)
+            
.setCapacity(_segmentMaxRowCount).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
             
.setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
             
.setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
             
.setInvertedIndexColumns(invertedIndexColumns).setTextIndexColumns(textIndexColumns)
@@ -1204,6 +1207,12 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
         String partitionColumn = entry.getKey();
         ColumnPartitionConfig columnPartitionConfig = entry.getValue();
         String partitionFunctionName = columnPartitionConfig.getFunctionName();
+
+        // NOTE: Here we compare the number of partitions from the config and 
the stream, and log a warning and emit a
+        //       metric when they don't match, but use the one from the 
stream. The mismatch could happen when the
+        //       stream partitions are changed, but the table config has not 
been updated to reflect the change. In such
+        //       case, picking the number of partitions from the stream can 
keep the segment properly partitioned as
+        //       long as the partition function is not changed.
         int numPartitions = columnPartitionConfig.getNumPartitions();
         try {
           int numStreamPartitions = 
_streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L);
@@ -1211,6 +1220,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
             segmentLogger.warn(
                 "Number of stream partitions: {} does not match number of 
partitions in the partition config: {}, using number of stream partitions",
                 numStreamPartitions, numPartitions);
+            _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_PARTITION_MISMATCH, 1);
             numPartitions = numStreamPartitions;
           }
         } catch (Exception e) {
@@ -1219,6 +1229,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
               numPartitions, e);
           makeStreamMetadataProvider("Timeout getting number of stream 
partitions");
         }
+
         realtimeSegmentConfigBuilder.setPartitionColumn(partitionColumn);
         realtimeSegmentConfigBuilder
             
.setPartitionFunction(PartitionFunctionFactory.getPartitionFunction(partitionFunctionName,
 numPartitions));
@@ -1228,7 +1239,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       }
     }
 
-    _realtimeSegment = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build());
+    _realtimeSegment = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), serverMetrics);
     _startOffset = 
_streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getStartOffset());
     _currentOffset = _streamPartitionMsgOffsetFactory.create(_startOffset);
     _resourceTmpDir = new File(resourceDataDir, "_tmp");
@@ -1256,7 +1267,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       _consumeEndTime = now + minConsumeTimeMillis;
     }
 
-    _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, 
_protocolHandler, tableConfig, indexLoadingConfig, serverMetrics);
+    _segmentCommitterFactory =
+        new SegmentCommitterFactory(segmentLogger, _protocolHandler, 
tableConfig, indexLoadingConfig, serverMetrics);
 
     segmentLogger
         .info("Starting consumption on realtime consuming segment {} 
maxRowCount {} maxEndTime {}", _llcSegmentName,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index f4fd34f..57534cf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -35,6 +35,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.data.partition.PartitionFunction;
 import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
@@ -96,7 +98,9 @@ public class MutableSegmentImpl implements MutableSegment {
 
   private final Logger _logger;
   private final long _startTimeMillis = System.currentTimeMillis();
+  private final ServerMetrics _serverMetrics;
 
+  private final String _tableNameWithType;
   private final String _segmentName;
   private final Schema _schema;
   private final String _timeColumnName;
@@ -107,7 +111,6 @@ public class MutableSegmentImpl implements MutableSegment {
   private final RealtimeSegmentStatsHistory _statsHistory;
   private final String _partitionColumn;
   private final PartitionFunction _partitionFunction;
-  private final int _partitionId;
   private final boolean _nullHandlingEnabled;
 
   private final Map<String, IndexContainer> _indexContainerMap = new 
HashMap<>();
@@ -133,7 +136,10 @@ public class MutableSegmentImpl implements MutableSegment {
   private final Map<String, FieldSpec> _newlyAddedColumnsFieldMap = new 
ConcurrentHashMap();
   private final Map<String, FieldSpec> _newlyAddedPhysicalColumnsFieldMap = 
new ConcurrentHashMap();
 
-  public MutableSegmentImpl(RealtimeSegmentConfig config) {
+  public MutableSegmentImpl(RealtimeSegmentConfig config, @Nullable 
ServerMetrics serverMetrics) {
+    _serverMetrics = serverMetrics;
+
+    _tableNameWithType = config.getTableNameWithType();
     _segmentName = config.getSegmentName();
     _schema = config.getSchema();
     _timeColumnName = config.getTimeColumnName();
@@ -160,7 +166,6 @@ public class MutableSegmentImpl implements MutableSegment {
     _statsHistory = config.getStatsHistory();
     _partitionColumn = config.getPartitionColumn();
     _partitionFunction = config.getPartitionFunction();
-    _partitionId = config.getPartitionId();
     _nullHandlingEnabled = config.isNullHandlingEnabled();
     _aggregateMetrics = config.aggregateMetrics();
 
@@ -205,10 +210,16 @@ public class MutableSegmentImpl implements MutableSegment 
{
 
       // Partition info
       PartitionFunction partitionFunction = null;
-      int partitionId = 0;
+      Set<Integer> partitions = null;
       if (column.equals(_partitionColumn)) {
         partitionFunction = _partitionFunction;
-        partitionId = _partitionId;
+
+        // NOTE: Use a concurrent set because the partitions can be updated 
when the partition of the ingested record
+        //       does not match the stream partition. This could happen when 
stream partition changes, or the records
+        //       are not properly partitioned from the stream. Log an warning 
and emit a metric if it happens, then add
+        //       the new partition into this set.
+        partitions = ConcurrentHashMap.newKeySet();
+        partitions.add(config.getPartitionId());
       }
 
       // Check whether to generate raw index for the column while consuming
@@ -297,7 +308,7 @@ public class MutableSegmentImpl implements MutableSegment {
 
       // TODO: Support range index and bloom filter for mutable segment
       _indexContainerMap.put(column,
-          new IndexContainer(fieldSpec, partitionFunction, partitionId, new 
NumValuesInfo(), forwardIndex, dictionary,
+          new IndexContainer(fieldSpec, partitionFunction, partitions, new 
NumValuesInfo(), forwardIndex, dictionary,
               invertedIndexReader, null, textIndex, null, nullValueVector));
     }
 
@@ -472,6 +483,17 @@ public class MutableSegmentImpl implements MutableSegment {
       if (fieldSpec.isSingleValueField()) {
         // Single-value column
 
+        // Check partitions
+        if (column.equals(_partitionColumn)) {
+          int partition = _partitionFunction.getPartition(value);
+          if (indexContainer._partitions.add(partition)) {
+            _logger.warn("Found new partition: {} from partition column: {}, 
value: {}", partition, column, value);
+            if (_serverMetrics != null) {
+              _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_PARTITION_MISMATCH, 1);
+            }
+          }
+        }
+
         // Update numValues info
         indexContainer._numValuesInfo.updateSVEntry();
 
@@ -967,7 +989,7 @@ public class MutableSegmentImpl implements MutableSegment {
   private class IndexContainer implements Closeable {
     final FieldSpec _fieldSpec;
     final PartitionFunction _partitionFunction;
-    final int _partitionId;
+    final Set<Integer> _partitions;
     final NumValuesInfo _numValuesInfo;
     final MutableForwardIndex _forwardIndex;
     final BaseMutableDictionary _dictionary;
@@ -984,14 +1006,14 @@ public class MutableSegmentImpl implements 
MutableSegment {
     int _dictId = Integer.MIN_VALUE;
     int[] _dictIds;
 
-    IndexContainer(FieldSpec fieldSpec, @Nullable PartitionFunction 
partitionFunction, int partitionId,
-        NumValuesInfo numValuesInfo, MutableForwardIndex forwardIndex, 
@Nullable BaseMutableDictionary dictionary,
-        @Nullable RealtimeInvertedIndexReader invertedIndex, @Nullable 
InvertedIndexReader rangeIndex,
-        @Nullable RealtimeLuceneTextIndexReader textIndex, @Nullable 
BloomFilterReader bloomFilter,
-        @Nullable MutableNullValueVector nullValueVector) {
+    IndexContainer(FieldSpec fieldSpec, @Nullable PartitionFunction 
partitionFunction,
+        @Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo, 
MutableForwardIndex forwardIndex,
+        @Nullable BaseMutableDictionary dictionary, @Nullable 
RealtimeInvertedIndexReader invertedIndex,
+        @Nullable InvertedIndexReader rangeIndex, @Nullable 
RealtimeLuceneTextIndexReader textIndex,
+        @Nullable BloomFilterReader bloomFilter, @Nullable 
MutableNullValueVector nullValueVector) {
       _fieldSpec = fieldSpec;
       _partitionFunction = partitionFunction;
-      _partitionId = partitionId;
+      _partitions = partitions;
       _numValuesInfo = numValuesInfo;
       _forwardIndex = forwardIndex;
       _dictionary = dictionary;
@@ -1004,7 +1026,7 @@ public class MutableSegmentImpl implements MutableSegment 
{
 
     DataSource toDataSource() {
       return new MutableDataSource(_fieldSpec, _numDocsIndexed, 
_numValuesInfo._numValues,
-          _numValuesInfo._maxNumValuesPerMVEntry, _partitionFunction, 
_partitionId, _minValue, _maxValue, _forwardIndex,
+          _numValuesInfo._maxNumValuesPerMVEntry, _partitionFunction, 
_partitions, _minValue, _maxValue, _forwardIndex,
           _dictionary, _invertedIndex, _rangeIndex, _textIndex, _bloomFilter, 
_nullValueVector);
     }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
index 0f97048..d9c8d0f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
@@ -27,6 +27,7 @@ import org.apache.pinot.spi.data.Schema;
 
 
 public class RealtimeSegmentConfig {
+  private final String _tableNameWithType;
   private final String _segmentName;
   private final String _streamName;
   private final Schema _schema;
@@ -49,12 +50,13 @@ public class RealtimeSegmentConfig {
   private final String _consumerDir;
 
   // TODO: Clean up this constructor. Most of these things can be extracted 
from tableConfig.
-  private RealtimeSegmentConfig(String segmentName, String streamName, Schema 
schema, String timeColumnName,
-      int capacity, int avgNumMultiValues, Set<String> noDictionaryColumns, 
Set<String> varLengthDictionaryColumns,
-      Set<String> invertedIndexColumns, Set<String> textIndexColumns,
+  private RealtimeSegmentConfig(String tableNameWithType, String segmentName, 
String streamName, Schema schema,
+      String timeColumnName, int capacity, int avgNumMultiValues, Set<String> 
noDictionaryColumns,
+      Set<String> varLengthDictionaryColumns, Set<String> 
invertedIndexColumns, Set<String> textIndexColumns,
       RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, boolean offHeap, 
PinotDataBufferMemoryManager memoryManager,
       RealtimeSegmentStatsHistory statsHistory, String partitionColumn, 
PartitionFunction partitionFunction,
       int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled, 
String consumerDir) {
+    _tableNameWithType = tableNameWithType;
     _segmentName = segmentName;
     _streamName = streamName;
     _schema = schema;
@@ -77,6 +79,10 @@ public class RealtimeSegmentConfig {
     _consumerDir = consumerDir;
   }
 
+  public String getTableNameWithType() {
+    return _tableNameWithType;
+  }
+
   public String getSegmentName() {
     return _segmentName;
   }
@@ -163,6 +169,7 @@ public class RealtimeSegmentConfig {
   }
 
   public static class Builder {
+    private String _tableNameWithType;
     private String _segmentName;
     private String _streamName;
     private Schema _schema;
@@ -187,6 +194,11 @@ public class RealtimeSegmentConfig {
     public Builder() {
     }
 
+    public Builder setTableNameWithType(String tableNameWithType) {
+      _tableNameWithType = tableNameWithType;
+      return this;
+    }
+
     public Builder setSegmentName(String segmentName) {
       _segmentName = segmentName;
       return this;
@@ -296,8 +308,8 @@ public class RealtimeSegmentConfig {
     }
 
     public RealtimeSegmentConfig build() {
-      return new RealtimeSegmentConfig(_segmentName, _streamName, _schema, 
_timeColumnName, _capacity,
-          _avgNumMultiValues, _noDictionaryColumns, 
_varLengthDictionaryColumns, _invertedIndexColumns,
+      return new RealtimeSegmentConfig(_tableNameWithType, _segmentName, 
_streamName, _schema, _timeColumnName,
+          _capacity, _avgNumMultiValues, _noDictionaryColumns, 
_varLengthDictionaryColumns, _invertedIndexColumns,
           _textIndexColumns, _realtimeSegmentZKMetadata, _offHeap, 
_memoryManager, _statsHistory, _partitionColumn,
           _partitionFunction, _partitionId, _aggregateMetrics, 
_nullHandlingEnabled, _consumerDir);
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
index 3a6b3a2..a927353 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.segment.index.datasource;
 
-import java.util.Collections;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.common.DataSourceMetadata;
@@ -35,16 +34,17 @@ import org.apache.pinot.spi.data.FieldSpec;
 /**
  * The {@code MutableDataSource} class is the data source for a column in the 
mutable segment.
  */
+@SuppressWarnings("rawtypes")
 public class MutableDataSource extends BaseDataSource {
 
   public MutableDataSource(FieldSpec fieldSpec, int numDocs, int numValues, 
int maxNumValuesPerMVEntry,
-      @Nullable PartitionFunction partitionFunction, int partitionId, 
@Nullable Comparable minValue,
+      @Nullable PartitionFunction partitionFunction, @Nullable Set<Integer> 
partitions, @Nullable Comparable minValue,
       @Nullable Comparable maxValue, ForwardIndexReader forwardIndex, 
@Nullable Dictionary dictionary,
       @Nullable InvertedIndexReader invertedIndex, @Nullable 
InvertedIndexReader rangeIndex,
       @Nullable TextIndexReader textIndex, @Nullable BloomFilterReader 
bloomFilter,
       @Nullable NullValueVectorReader nullValueVector) {
     super(new MutableDataSourceMetadata(fieldSpec, numDocs, numValues, 
maxNumValuesPerMVEntry, partitionFunction,
-            partitionId, minValue, maxValue), forwardIndex, dictionary, 
invertedIndex, rangeIndex, textIndex, bloomFilter,
+            partitions, minValue, maxValue), forwardIndex, dictionary, 
invertedIndex, rangeIndex, textIndex, bloomFilter,
         nullValueVector);
   }
 
@@ -59,7 +59,7 @@ public class MutableDataSource extends BaseDataSource {
     final Comparable _maxValue;
 
     MutableDataSourceMetadata(FieldSpec fieldSpec, int numDocs, int numValues, 
int maxNumValuesPerMVEntry,
-        @Nullable PartitionFunction partitionFunction, int partitionId, 
@Nullable Comparable minValue,
+        @Nullable PartitionFunction partitionFunction, @Nullable Set<Integer> 
partitions, @Nullable Comparable minValue,
         @Nullable Comparable maxValue) {
       _fieldSpec = fieldSpec;
       _numDocs = numDocs;
@@ -67,7 +67,7 @@ public class MutableDataSource extends BaseDataSource {
       _maxNumValuesPerMVEntry = maxNumValuesPerMVEntry;
       if (partitionFunction != null) {
         _partitionFunction = partitionFunction;
-        _partitions = Collections.singleton(partitionId);
+        _partitions = partitions;
       } else {
         _partitionFunction = null;
         _partitions = null;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
index e4df760..e7a7363 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.core.indexsegment.mutable;
 
 import java.util.Set;
-import javax.annotation.Nonnull;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
@@ -35,30 +34,30 @@ public class MutableSegmentImplTestUtils {
   private MutableSegmentImplTestUtils() {
   }
 
-  private static final String STEAM_NAME = "testStream";
+  private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME";
   private static final String SEGMENT_NAME = "testSegment";
+  private static final String STEAM_NAME = "testStream";
 
-  public static MutableSegmentImpl createMutableSegmentImpl(@Nonnull Schema 
schema,
-      @Nonnull Set<String> noDictionaryColumns, @Nonnull Set<String> 
varLengthDictionaryColumns,
-      @Nonnull Set<String> invertedIndexColumns, boolean aggregateMetrics) {
+  public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, 
Set<String> noDictionaryColumns,
+      Set<String> varLengthDictionaryColumns, Set<String> 
invertedIndexColumns, boolean aggregateMetrics) {
     return createMutableSegmentImpl(schema, noDictionaryColumns, 
varLengthDictionaryColumns, invertedIndexColumns,
         aggregateMetrics, false);
   }
 
-  public static MutableSegmentImpl createMutableSegmentImpl(@Nonnull Schema 
schema,
-      @Nonnull Set<String> noDictionaryColumns, @Nonnull Set<String> 
varLengthDictionaryColumns,
-      @Nonnull Set<String> invertedIndexColumns, boolean aggregateMetrics, 
boolean nullHandlingEnabled) {
+  public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, 
Set<String> noDictionaryColumns,
+      Set<String> varLengthDictionaryColumns, Set<String> 
invertedIndexColumns, boolean aggregateMetrics,
+      boolean nullHandlingEnabled) {
     RealtimeSegmentStatsHistory statsHistory = 
mock(RealtimeSegmentStatsHistory.class);
     when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200);
     when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
 
     RealtimeSegmentConfig realtimeSegmentConfig =
-        new 
RealtimeSegmentConfig.Builder().setSegmentName(SEGMENT_NAME).setStreamName(STEAM_NAME).setSchema(schema)
-            
.setCapacity(100000).setAvgNumMultiValues(2).setNoDictionaryColumns(noDictionaryColumns)
-            
.setVarLengthDictionaryColumns(varLengthDictionaryColumns).setInvertedIndexColumns(invertedIndexColumns)
-            .setRealtimeSegmentZKMetadata(new RealtimeSegmentZKMetadata())
+        new 
RealtimeSegmentConfig.Builder().setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
+            
.setStreamName(STEAM_NAME).setSchema(schema).setCapacity(100000).setAvgNumMultiValues(2)
+            
.setNoDictionaryColumns(noDictionaryColumns).setVarLengthDictionaryColumns(varLengthDictionaryColumns)
+            
.setInvertedIndexColumns(invertedIndexColumns).setRealtimeSegmentZKMetadata(new 
RealtimeSegmentZKMetadata())
             .setMemoryManager(new 
DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
             
.setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).build();
-    return new MutableSegmentImpl(realtimeSegmentConfig);
+    return new MutableSegmentImpl(realtimeSegmentConfig, null);
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
new file mode 100644
index 0000000..c520599
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.RoutingConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Integration test that enables segment partition for the LLC real-time table.
+ */
+public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
+  // Number of documents in the first Avro file
+  private static final long NUM_DOCS = 9292;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+
+    // Start Kafka
+    startKafka();
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload the schema and table config with reduced number of 
columns and partition config
+    Schema schema =
+        new 
Schema.SchemaBuilder().setSchemaName(getSchemaName()).addSingleValueDimension("Carrier",
 DataType.STRING)
+            .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", 
"1:DAYS").build();
+    addSchema(schema);
+
+    TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    indexingConfig.setSegmentPartitionConfig(
+        new SegmentPartitionConfig(Collections.singletonMap("Carrier", new 
ColumnPartitionConfig("murmur", 5))));
+    tableConfig.setRoutingConfig(
+        new RoutingConfig(null, 
Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE), null));
+    addTableConfig(tableConfig);
+
+    // Push data into Kafka (only ingest the first Avro file)
+    pushAvroIntoKafka(Collections.singletonList(avroFiles.get(0)));
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return NUM_DOCS;
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  protected List<String> getInvertedIndexColumns() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  protected List<String> getNoDictionaryColumns() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  protected List<String> getRangeIndexColumns() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  protected List<String> getBloomFilterColumns() {
+    return null;
+  }
+
+  @Test
+  public void testPartitionMetadata() {
+    List<RealtimeSegmentZKMetadata> segmentZKMetadataList =
+        _helixResourceManager.getRealtimeSegmentMetadata(getTableName());
+    for (RealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+      SegmentPartitionMetadata segmentPartitionMetadata = 
segmentZKMetadata.getPartitionMetadata();
+      assertNotNull(segmentPartitionMetadata);
+      Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap =
+          segmentPartitionMetadata.getColumnPartitionMap();
+      assertEquals(columnPartitionMetadataMap.size(), 1);
+      ColumnPartitionMetadata columnPartitionMetadata = 
columnPartitionMetadataMap.get("Carrier");
+      assertNotNull(columnPartitionMetadata);
+
+      // The function name should be aligned with the partition config in the 
table config
+      
assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
+
+      if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
+        // Consuming segment
+
+        // Number of partitions should be aligned with the partition config in 
the table config
+        assertEquals(columnPartitionMetadata.getNumPartitions(), 5);
+
+        // Should contain only the stream partition
+        assertEquals(columnPartitionMetadata.getPartitions(),
+            Collections.singleton(new 
LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId()));
+      } else {
+        // Completed segment
+
+        // Number of partitions should be the same as number of stream 
partitions
+        assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
+
+        // Should contain the partitions based on the ingested records. Since 
the records are not partitioned in Kafka,
+        // it should contain all the partitions.
+        assertEquals(columnPartitionMetadata.getPartitions(), new 
HashSet<>(Arrays.asList(0, 1)));
+      }
+    }
+  }
+
+  // TODO: Add test on partition routing once the consuming segment behavior 
is fixed.
+  //       Currently the partition info is cached in the 
PartitionSegmentPruner, and won't be reloaded when the
+  //       consuming segment gets committed. The segment will be pruned based 
on the consuming segment partition info
+  //       (using stream partition as the segment partition), even if the 
partition info changed for the completed
+  //       segment.
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    dropRealtimeTable(getTableName());
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+}
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index 874d4c2..ef0484f 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -85,7 +85,7 @@ public class MemoryEstimator {
       throw new RuntimeException("Caught exception when reading segment index 
dir", e);
     }
     _totalDocsInSampleSegment = _segmentMetadata.getTotalDocs();
-    _sampleSegmentConsumedSeconds = 
(int)(_totalDocsInSampleSegment/ingestionRate);
+    _sampleSegmentConsumedSeconds = (int) (_totalDocsInSampleSegment / 
ingestionRate);
 
     if 
(CollectionUtils.isNotEmpty(_tableConfig.getIndexingConfig().getNoDictionaryColumns()))
 {
       
_noDictionaryColumns.addAll(_tableConfig.getIndexingConfig().getNoDictionaryColumns());
@@ -130,7 +130,8 @@ public class MemoryEstimator {
 
     // create a config
     RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
-        new 
RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName()).setStreamName(_tableNameWithType)
+        new 
RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType)
+            
.setSegmentName(_segmentMetadata.getName()).setStreamName(_tableNameWithType)
             
.setSchema(_segmentMetadata.getSchema()).setCapacity(_segmentMetadata.getTotalDocs())
             
.setAvgNumMultiValues(_avgMultiValues).setNoDictionaryColumns(_noDictionaryColumns)
             
.setVarLengthDictionaryColumns(_varLengthDictionaryColumns).setInvertedIndexColumns(_invertedIndexColumns)
@@ -138,13 +139,13 @@ public class MemoryEstimator {
             .setStatsHistory(sampleStatsHistory);
 
     // create mutable segment impl
-    MutableSegmentImpl mutableSegmentImpl = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build());
+    MutableSegmentImpl mutableSegmentImpl = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
 
     // read all rows and index them
     try (PinotSegmentRecordReader segmentRecordReader = new 
PinotSegmentRecordReader(_sampleCompletedSegment)) {
       GenericRow row = new GenericRow();
       while (segmentRecordReader.hasNext()) {
-        segmentRecordReader.next(row);
+        row = segmentRecordReader.next(row);
         mutableSegmentImpl.index(row, null);
         row.clear();
       }
@@ -224,9 +225,10 @@ public class MemoryEstimator {
 
       memoryForConsumingSegmentPerPartition += 
getMemoryForInvertedIndex(memoryForConsumingSegmentPerPartition);
 
-      int numActiveSegmentsPerPartition = (retentionHours  + numHoursToConsume 
- 1)/numHoursToConsume;
-      long activeMemoryForCompletedSegmentsPerPartition = 
completedSegmentSizeBytes * (numActiveSegmentsPerPartition - 1);
-      int numCompletedSegmentsPerPartition = (_tableRetentionHours + 
numHoursToConsume - 1)/numHoursToConsume - 1;
+      int numActiveSegmentsPerPartition = (retentionHours + numHoursToConsume 
- 1) / numHoursToConsume;
+      long activeMemoryForCompletedSegmentsPerPartition =
+          completedSegmentSizeBytes * (numActiveSegmentsPerPartition - 1);
+      int numCompletedSegmentsPerPartition = (_tableRetentionHours + 
numHoursToConsume - 1) / numHoursToConsume - 1;
 
       for (int j = 0; j < numHosts.length; j++) {
         int numHostsToProvision = numHosts[j];
@@ -238,22 +240,26 @@ public class MemoryEstimator {
             activeMemoryForCompletedSegmentsPerPartition * 
totalConsumingPartitionsPerHost;
         long totalMemoryForConsumingSegmentsPerHost =
             memoryForConsumingSegmentPerPartition * 
totalConsumingPartitionsPerHost;
-        long activeMemoryPerHostBytes = 
activeMemoryForCompletedSegmentsPerHost + 
totalMemoryForConsumingSegmentsPerHost;
-        long mappedMemoryPerHost = totalMemoryForConsumingSegmentsPerHost +
-            (numCompletedSegmentsPerPartition * 
totalConsumingPartitionsPerHost * completedSegmentSizeBytes);
+        long activeMemoryPerHostBytes =
+            activeMemoryForCompletedSegmentsPerHost + 
totalMemoryForConsumingSegmentsPerHost;
+        long mappedMemoryPerHost =
+            totalMemoryForConsumingSegmentsPerHost + 
(numCompletedSegmentsPerPartition * totalConsumingPartitionsPerHost
+                * completedSegmentSizeBytes);
 
         if (activeMemoryPerHostBytes <= _maxUsableHostMemory) {
-          _activeMemoryPerHost[i][j] = 
DataSizeUtils.fromBytes(activeMemoryPerHostBytes)
-              + "/" + DataSizeUtils.fromBytes(mappedMemoryPerHost);
+          _activeMemoryPerHost[i][j] =
+              DataSizeUtils.fromBytes(activeMemoryPerHostBytes) + "/" + 
DataSizeUtils.fromBytes(mappedMemoryPerHost);
           _consumingMemoryPerHost[i][j] = 
DataSizeUtils.fromBytes(totalMemoryForConsumingSegmentsPerHost);
           _optimalSegmentSize[i][j] = 
DataSizeUtils.fromBytes(completedSegmentSizeBytes);
-          _numSegmentsQueriedPerHost[i][j] = 
String.valueOf(numActiveSegmentsPerPartition * totalConsumingPartitionsPerHost);
+          _numSegmentsQueriedPerHost[i][j] =
+              String.valueOf(numActiveSegmentsPerPartition * 
totalConsumingPartitionsPerHost);
         }
       }
     }
   }
 
-  private long getMemoryForConsumingSegmentPerPartition(File statsFile, int 
totalDocs) throws IOException {
+  private long getMemoryForConsumingSegmentPerPartition(File statsFile, int 
totalDocs)
+      throws IOException {
     // We don't want the stats history to get updated from all our dummy runs
     // So we copy over the original stats history every time we start
     File statsFileCopy = new File(_tableDataDir, STATS_FILE_COPY_NAME);
@@ -269,15 +275,15 @@ public class MemoryEstimator {
     RealtimeSegmentZKMetadata segmentZKMetadata = 
getRealtimeSegmentZKMetadata(_segmentMetadata, totalDocs);
 
     RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
-        new 
RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName())
-            
.setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema()).setCapacity(totalDocs)
-            
.setAvgNumMultiValues(_avgMultiValues).setNoDictionaryColumns(_noDictionaryColumns)
-            
.setVarLengthDictionaryColumns(_varLengthDictionaryColumns).setInvertedIndexColumns(_invertedIndexColumns)
-            
.setRealtimeSegmentZKMetadata(segmentZKMetadata).setOffHeap(true).setMemoryManager(memoryManager)
-            .setStatsHistory(statsHistory);
+        new 
RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType)
+            
.setSegmentName(_segmentMetadata.getName()).setStreamName(_tableNameWithType)
+            
.setSchema(_segmentMetadata.getSchema()).setCapacity(totalDocs).setAvgNumMultiValues(_avgMultiValues)
+            
.setNoDictionaryColumns(_noDictionaryColumns).setVarLengthDictionaryColumns(_varLengthDictionaryColumns)
+            
.setInvertedIndexColumns(_invertedIndexColumns).setRealtimeSegmentZKMetadata(segmentZKMetadata)
+            
.setOffHeap(true).setMemoryManager(memoryManager).setStatsHistory(statsHistory);
 
     // create mutable segment impl
-    MutableSegmentImpl mutableSegmentImpl = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build());
+    MutableSegmentImpl mutableSegmentImpl = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
     long memoryForConsumingSegmentPerPartition = 
memoryManager.getTotalAllocatedBytes();
     mutableSegmentImpl.destroy();
     FileUtils.deleteQuietly(statsFileCopy);
@@ -362,7 +368,7 @@ public class MemoryEstimator {
   private long calculateMemoryForCompletedSegmentsPerPartition(long 
completedSegmentSizeBytes, int numHoursToConsume,
       int retentionHours) {
 
-    int numSegmentsInMemory = (retentionHours + numHoursToConsume - 
1)/numHoursToConsume;
+    int numSegmentsInMemory = (retentionHours + numHoursToConsume - 1) / 
numHoursToConsume;
     return completedSegmentSizeBytes * (numSegmentsInMemory - 1);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to