This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f4e5518  Add partial upsert config and mergers (#6899)
f4e5518 is described below

commit f4e55182a8c552c8493d1693868b0c752b1a1d56
Author: deemoliu <[email protected]>
AuthorDate: Tue Jun 22 11:45:29 2021 -0700

    Add partial upsert config and mergers (#6899)
    
    Add the following pieces for partial upsert:
    - The upsert metadata and upsert data manager changes to support previous 
record lookup process
    - Partial upsert merger and handler framework for different partial upsert 
strategies
    - Null value handling during partial upsert
    - Make sure all segments are online before consume
    - Table config validation
    - A partial upsert quickstart demo using MeetupEventRSVP
---
 .../manager/realtime/RealtimeTableDataManager.java |  46 ++++--
 .../org/apache/pinot/core/plan/FilterPlanNode.java |  12 +-
 .../core/plan/maker/InstancePlanMakerImplV2.java   |   2 +-
 .../query/pruner/SelectionQuerySegmentPruner.java  |   2 +-
 .../realtime/LLRealtimeSegmentDataManagerTest.java |  12 +-
 ...adataAndDictionaryAggregationPlanMakerTest.java |   4 +-
 .../pruner/SelectionQuerySegmentPrunerTest.java    |   6 +-
 .../indexsegment/immutable/EmptyIndexSegment.java  |   4 +-
 .../immutable/ImmutableSegmentImpl.java            |  18 +-
 .../indexsegment/mutable/IntermediateSegment.java  |   4 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   |  65 ++++----
 .../realtime/impl/geospatial/MutableH3Index.java   |   2 +-
 .../invertedindex/RealtimeInvertedIndexReader.java |   2 +-
 .../impl/nullvalue/MutableNullValueVector.java     |   2 +-
 .../index/readers/ValidDocIndexReaderImpl.java     |   2 +-
 .../segment/local/upsert/PartialUpsertHandler.java | 142 ++++++++++++++++
 .../upsert/PartitionUpsertMetadataManager.java     | 161 +++++++++++-------
 .../pinot/segment/local/upsert/RecordLocation.java |  18 +-
 .../local/upsert/TableUpsertMetadataManager.java   |  10 +-
 .../merger/IncrementMerger.java}                   |  33 ++--
 .../local/upsert/merger/OverwriteMerger.java       |  27 +--
 .../local/upsert/merger/PartialUpsertMerger.java   |  32 ++--
 .../upsert/merger/PartialUpsertMergerFactory.java  |  31 ++--
 .../segment/local/utils/TableConfigUtils.java      |  40 ++++-
 .../mutable/MutableSegmentImplUpsertTest.java      |   4 +-
 .../local/upsert/PartialUpsertHandlerTest.java     |  81 +++++++++
 .../upsert/PartitionUpsertMetadataManagerTest.java |  94 ++++++-----
 .../segment/local/utils}/TableConfigUtilsTest.java | 181 +++++++++++++++------
 .../org/apache/pinot/segment/spi/IndexSegment.java |   4 +-
 .../spi/index}/ThreadSafeMutableRoaringBitmap.java |   2 +-
 .../pinot/spi/config/table/UpsertConfig.java       |  28 +++-
 .../apache/pinot/spi/data/readers/GenericRow.java  |  10 +-
 .../pinot/spi/config/table/UpsertConfigTest.java   |  18 +-
 pinot-tools/pom.xml                                |  11 ++
 .../pinot/tools/PartialUpsertQuickStart.java       | 119 ++++++++++++++
 ...t_partial_meetupRsvp_realtime_table_config.json |  53 ++++++
 36 files changed, 944 insertions(+), 338 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index b1a9e2d..4cb8cca 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -54,17 +54,18 @@ import 
org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
-import 
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import 
org.apache.pinot.segment.local.segment.index.loader.V3RemoveIndexException;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import 
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
+import org.apache.pinot.segment.local.upsert.PartialUpsertHandler;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
 import org.apache.pinot.segment.local.utils.IngestionUtils;
 import org.apache.pinot.segment.local.utils.SchemaUtils;
 import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -158,9 +159,18 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", _tableNameWithType);
     _upsertMode = tableConfig.getUpsertMode();
     if (isUpsertEnabled()) {
+      UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+      assert upsertConfig != null;
       Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
       Preconditions.checkState(schema != null, "Failed to find schema for 
table: %s", _tableNameWithType);
-      _tableUpsertMetadataManager = new 
TableUpsertMetadataManager(_tableNameWithType, _serverMetrics);
+
+      PartialUpsertHandler partialUpsertHandler = null;
+      if (isPartialUpsertEnabled()) {
+        partialUpsertHandler =
+            new PartialUpsertHandler(_helixManager, _tableNameWithType, 
upsertConfig.getPartialUpsertStrategies());
+      }
+      _tableUpsertMetadataManager =
+          new TableUpsertMetadataManager(_tableNameWithType, _serverMetrics, 
partialUpsertHandler);
       _primaryKeyColumns = schema.getPrimaryKeyColumns();
       Preconditions.checkState(!CollectionUtils.isEmpty(_primaryKeyColumns),
           "Primary key columns must be configured for upsert");
@@ -232,6 +242,10 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     return _upsertMode != UpsertConfig.Mode.NONE;
   }
 
+  public boolean isPartialUpsertEnabled() {
+    return _upsertMode == UpsertConfig.Mode.PARTIAL;
+  }
+
   /*
    * This call comes in one of two ways:
    * For HL Segments:
@@ -300,11 +314,12 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       // The segment is uploaded to an upsert enabled realtime table. Download 
the segment and load.
       Preconditions.checkArgument(realtimeSegmentZKMetadata instanceof 
LLCRealtimeSegmentZKMetadata,
           "Upload segment is not LLC segment");
-      String downURL = 
((LLCRealtimeSegmentZKMetadata)realtimeSegmentZKMetadata).getDownloadUrl();
-      Preconditions.checkNotNull(downURL, "Upload segment metadata has no 
download url");
-      downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, downURL);
-      _logger.info("Downloaded, untarred and add segment {} of table {} from 
{}", segmentName, tableConfig.getTableName(),
-          downURL);
+      String downloadUrl = ((LLCRealtimeSegmentZKMetadata) 
realtimeSegmentZKMetadata).getDownloadUrl();
+      Preconditions.checkNotNull(downloadUrl, "Upload segment metadata has no 
download url");
+      downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, 
downloadUrl);
+      _logger
+          .info("Downloaded, untarred and add segment {} of table {} from {}", 
segmentName, tableConfig.getTableName(),
+              downloadUrl);
       return;
     }
 
@@ -354,17 +369,20 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   }
 
   private void handleUpsert(ImmutableSegmentImpl immutableSegment) {
+    String segmentName = immutableSegment.getSegmentName();
+    int partitionGroupId = SegmentUtils
+        .getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, 
_helixManager, _primaryKeyColumns.get(0));
+    PartitionUpsertMetadataManager partitionUpsertMetadataManager =
+        
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId);
+    ThreadSafeMutableRoaringBitmap validDocIds = new 
ThreadSafeMutableRoaringBitmap();
+    immutableSegment.enableUpsert(partitionUpsertMetadataManager, validDocIds);
+
     Map<String, PinotSegmentColumnReader> columnToReaderMap = new HashMap<>();
     for (String primaryKeyColumn : _primaryKeyColumns) {
       columnToReaderMap.put(primaryKeyColumn, new 
PinotSegmentColumnReader(immutableSegment, primaryKeyColumn));
     }
     columnToReaderMap.put(_timeColumnName, new 
PinotSegmentColumnReader(immutableSegment, _timeColumnName));
     int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs();
-    String segmentName = immutableSegment.getSegmentName();
-    int partitionGroupId = SegmentUtils
-        .getRealtimeSegmentPartitionId(segmentName, this.getTableName(), 
_helixManager, _primaryKeyColumns.get(0));
-    PartitionUpsertMetadataManager partitionUpsertMetadataManager =
-        
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId);
     int numPrimaryKeyColumns = _primaryKeyColumns.size();
     Iterator<PartitionUpsertMetadataManager.RecordInfo> recordInfoIterator =
         new Iterator<PartitionUpsertMetadataManager.RecordInfo>() {
@@ -392,9 +410,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
             return new PartitionUpsertMetadataManager.RecordInfo(primaryKey, 
_docId++, timestamp);
           }
         };
-    ThreadSafeMutableRoaringBitmap validDocIds =
-        partitionUpsertMetadataManager.addSegment(segmentName, 
recordInfoIterator);
-    immutableSegment.enableUpsert(partitionUpsertMetadataManager, validDocIds);
+    partitionUpsertMetadataManager.addSegment(immutableSegment, 
recordInfoIterator);
   }
 
   public void downloadAndReplaceSegment(String segmentName, 
LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 2ad5c26..4418e1d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -49,9 +49,9 @@ import org.apache.pinot.core.util.QueryOptions;
 import 
org.apache.pinot.segment.local.segment.index.datasource.MutableDataSource;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
 
 
@@ -71,23 +71,23 @@ public class FilterPlanNode implements PlanNode {
   @Override
   public BaseFilterOperator run() {
     FilterContext filter = _queryContext.getFilter();
-    ValidDocIndexReader validDocIndexReader = _indexSegment.getValidDocIndex();
+    ThreadSafeMutableRoaringBitmap validDocIds = 
_indexSegment.getValidDocIds();
     boolean upsertSkipped = false;
     if (_queryContext.getQueryOptions() != null) {
       upsertSkipped = new 
QueryOptions(_queryContext.getQueryOptions()).isSkipUpsert();
     }
     if (filter != null) {
       BaseFilterOperator filterOperator = constructPhysicalOperator(filter, 
_queryContext.getDebugOptions());
-      if (validDocIndexReader != null && !upsertSkipped) {
+      if (validDocIds != null && !upsertSkipped) {
         BaseFilterOperator validDocFilter =
-            new 
BitmapBasedFilterOperator(validDocIndexReader.getValidDocBitmap(), false, 
_numDocs);
+            new 
BitmapBasedFilterOperator(validDocIds.getMutableRoaringBitmap(), false, 
_numDocs);
         return 
FilterOperatorUtils.getAndFilterOperator(Arrays.asList(filterOperator, 
validDocFilter), _numDocs,
             _queryContext.getDebugOptions());
       } else {
         return filterOperator;
       }
-    } else if (validDocIndexReader != null && !upsertSkipped) {
-      return new 
BitmapBasedFilterOperator(validDocIndexReader.getValidDocBitmap(), false, 
_numDocs);
+    } else if (validDocIds != null && !upsertSkipped) {
+      return new 
BitmapBasedFilterOperator(validDocIds.getMutableRoaringBitmap(), false, 
_numDocs);
     } else {
       return new MatchAllFilterOperator(_numDocs);
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 3f94d80..3fb5a2a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -175,7 +175,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
 
         // Use metadata/dictionary to solve the query if possible
         // NOTE: Skip the segment with valid doc index because the valid doc 
index is equivalent to a filter.
-        if (queryContext.getFilter() == null && 
indexSegment.getValidDocIndex() == null) {
+        if (queryContext.getFilter() == null && indexSegment.getValidDocIds() 
== null) {
           if (isFitForMetadataBasedPlan(queryContext)) {
             return new MetadataBasedAggregationPlanNode(indexSegment, 
queryContext);
           } else if (isFitForDictionaryBasedPlan(queryContext, indexSegment)) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
index 0d40c97..7eea40b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
@@ -74,7 +74,7 @@ public class SelectionQuerySegmentPruner implements 
SegmentPruner {
     }
 
     // Skip pruning segments for upsert table because valid doc index is 
equivalent to a filter
-    if (segments.get(0).getValidDocIndex() != null) {
+    if (segments.get(0).getValidDocIds() != null) {
       return segments;
     }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 5f4aff4..6555bfa 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -47,7 +47,6 @@ import 
org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
 import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -318,8 +317,8 @@ public class LLRealtimeSegmentDataManagerTest {
     segmentDataManager._consumeOffsets.add(catchupOffset); // Offset after 
catchup
     final SegmentCompletionProtocol.Response holdResponse1 = new 
SegmentCompletionProtocol.Response(
         new SegmentCompletionProtocol.Response.Params()
-            
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).
-            withStreamPartitionMsgOffset(firstOffset.toString()));
+            
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)
+            .withStreamPartitionMsgOffset(firstOffset.toString()));
     final SegmentCompletionProtocol.Response catchupResponse = new 
SegmentCompletionProtocol.Response(
         new SegmentCompletionProtocol.Response.Params()
             
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP)
@@ -382,8 +381,8 @@ public class LLRealtimeSegmentDataManagerTest {
     final LongMsgOffset endOffset = new LongMsgOffset(_startOffsetValue + 500);
     segmentDataManager._consumeOffsets.add(endOffset);
     SegmentCompletionProtocol.Response.Params params = new 
SegmentCompletionProtocol.Response.Params();
-    params.withStreamPartitionMsgOffset(endOffset.toString()).
-        withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
+    params.withStreamPartitionMsgOffset(endOffset.toString())
+        .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
     final SegmentCompletionProtocol.Response keepResponse = new 
SegmentCompletionProtocol.Response(params);
     segmentDataManager._responses.add(keepResponse);
 
@@ -816,8 +815,7 @@ public class LLRealtimeSegmentDataManagerTest {
         throws Exception {
       super(segmentZKMetadata, tableConfig, realtimeTableDataManager, 
resourceDataDir,
           new IndexLoadingConfig(makeInstanceDataManagerConfig(), 
tableConfig), schema, llcSegmentName,
-          semaphoreMap.get(llcSegmentName.getPartitionGroupId()), 
serverMetrics,
-          new PartitionUpsertMetadataManager("testTable_REALTIME", 0, 
serverMetrics));
+          semaphoreMap.get(llcSegmentName.getPartitionGroupId()), 
serverMetrics, null);
       _state = LLRealtimeSegmentDataManager.class.getDeclaredField("_state");
       _state.setAccessible(true);
       _shouldStop = 
LLRealtimeSegmentDataManager.class.getDeclaredField("_shouldStop");
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index b1e6e81..956ad31 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -36,12 +36,12 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import 
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -124,7 +124,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
     ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class);
     _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, 
SEGMENT_NAME), ReadMode.heap);
     ((ImmutableSegmentImpl) _upsertIndexSegment)
-        .enableUpsert(new PartitionUpsertMetadataManager("testTable_REALTIME", 
0, serverMetrics),
+        .enableUpsert(new PartitionUpsertMetadataManager("testTable_REALTIME", 
0, serverMetrics, null),
             new ThreadSafeMutableRoaringBitmap());
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
index d7a01b0..c7f7d57 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
@@ -27,7 +27,7 @@ import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
@@ -217,8 +217,8 @@ public class SelectionQuerySegmentPrunerTest {
     when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
     when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs);
     if (upsert) {
-      ValidDocIndexReader validDocIndex = mock(ValidDocIndexReader.class);
-      when(indexSegment.getValidDocIndex()).thenReturn(validDocIndex);
+      ThreadSafeMutableRoaringBitmap validDocIds = 
mock(ThreadSafeMutableRoaringBitmap.class);
+      when(indexSegment.getValidDocIds()).thenReturn(validDocIds);
     }
     return indexSegment;
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
index c5d6b6e..d9c56c5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
@@ -25,12 +25,12 @@ import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.segment.index.datasource.EmptyDataSource;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.slf4j.Logger;
@@ -89,7 +89,7 @@ public class EmptyIndexSegment implements ImmutableSegment {
 
   @Nullable
   @Override
-  public ValidDocIndexReader getValidDocIndex() {
+  public ThreadSafeMutableRoaringBitmap getValidDocIds() {
     return null;
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index 653349e..4f110df 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -24,21 +24,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
-import 
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
 import 
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
-import 
org.apache.pinot.segment.local.segment.index.readers.ValidDocIndexReaderImpl;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexContainer;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
 import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -57,7 +55,6 @@ public class ImmutableSegmentImpl implements ImmutableSegment 
{
   // For upsert
   private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
   private ThreadSafeMutableRoaringBitmap _validDocIds;
-  private ValidDocIndexReader _validDocIndex;
   private PinotSegmentRecordReader _pinotSegmentRecordReader;
 
   public ImmutableSegmentImpl(SegmentDirectory segmentDirectory, 
SegmentMetadataImpl segmentMetadata,
@@ -76,7 +73,6 @@ public class ImmutableSegmentImpl implements ImmutableSegment 
{
       ThreadSafeMutableRoaringBitmap validDocIds) {
     _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
     _validDocIds = validDocIds;
-    _validDocIndex = new ValidDocIndexReaderImpl(validDocIds);
   }
 
   @Override
@@ -140,6 +136,11 @@ public class ImmutableSegmentImpl implements 
ImmutableSegment {
   public void destroy() {
     String segmentName = getSegmentName();
     LOGGER.info("Trying to destroy segment : {}", segmentName);
+
+    // Remove the upsert metadata before closing the readers
+    if (_partitionUpsertMetadataManager != null) {
+      _partitionUpsertMetadataManager.removeSegment(this);
+    }
     for (Map.Entry<String, ColumnIndexContainer> entry : 
_indexContainerMap.entrySet()) {
       try {
         entry.getValue().close();
@@ -159,9 +160,6 @@ public class ImmutableSegmentImpl implements 
ImmutableSegment {
         LOGGER.error("Failed to close star-tree. Continuing with error.", e);
       }
     }
-    if (_partitionUpsertMetadataManager != null) {
-      _partitionUpsertMetadataManager.removeSegment(segmentName, _validDocIds);
-    }
     if (_pinotSegmentRecordReader != null) {
       try {
         _pinotSegmentRecordReader.close();
@@ -178,8 +176,8 @@ public class ImmutableSegmentImpl implements 
ImmutableSegment {
 
   @Nullable
   @Override
-  public ValidDocIndexReader getValidDocIndex() {
-    return _validDocIndex;
+  public ThreadSafeMutableRoaringBitmap getValidDocIds() {
+    return _validDocIds;
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
index ee0bea4..875811e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
@@ -42,9 +42,9 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.index.reader.MutableDictionary;
 import org.apache.pinot.segment.spi.index.reader.MutableForwardIndex;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
@@ -219,7 +219,7 @@ public class IntermediateSegment implements MutableSegment {
 
   @Nullable
   @Override
-  public ValidDocIndexReader getValidDocIndex() {
+  public ThreadSafeMutableRoaringBitmap getValidDocIds() {
     return null;
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 6a09d63..96f71fe 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -40,7 +40,6 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import 
org.apache.pinot.segment.local.io.readerwriter.PinotDataBufferMemoryManager;
 import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
-import 
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
 import 
org.apache.pinot.segment.local.realtime.impl.dictionary.BaseOffHeapMutableDictionary;
 import 
org.apache.pinot.segment.local.realtime.impl.dictionary.MutableDictionaryFactory;
 import 
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex;
@@ -54,7 +53,6 @@ import 
org.apache.pinot.segment.local.realtime.impl.json.MutableJsonIndex;
 import 
org.apache.pinot.segment.local.realtime.impl.nullvalue.MutableNullValueVector;
 import 
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
 import 
org.apache.pinot.segment.local.segment.index.datasource.MutableDataSource;
-import 
org.apache.pinot.segment.local.segment.index.readers.ValidDocIndexReaderImpl;
 import 
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext;
 import 
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProvider;
 import 
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
@@ -67,13 +65,13 @@ import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.segment.spi.index.reader.MutableDictionary;
 import org.apache.pinot.segment.spi.index.reader.MutableForwardIndex;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
@@ -162,7 +160,6 @@ public class MutableSegmentImpl implements MutableSegment {
   //        consumption with newer timestamp (late event in consuming 
segment), the record location will be updated, but
   //        the valid doc ids won't be updated.
   private final ThreadSafeMutableRoaringBitmap _validDocIds;
-  private final ValidDocIndexReader _validDocIndex;
 
   public MutableSegmentImpl(RealtimeSegmentConfig config, @Nullable 
ServerMetrics serverMetrics) {
     _serverMetrics = serverMetrics;
@@ -373,13 +370,12 @@ public class MutableSegmentImpl implements MutableSegment 
{
     // init upsert-related data structure
     _upsertMode = config.getUpsertMode();
     if (isUpsertEnabled()) {
+      Preconditions.checkState(!_aggregateMetrics, "Metrics aggregation and 
upsert cannot be enabled together");
       _partitionUpsertMetadataManager = 
config.getPartitionUpsertMetadataManager();
       _validDocIds = new ThreadSafeMutableRoaringBitmap();
-      _validDocIndex = new ValidDocIndexReaderImpl(_validDocIds);
     } else {
       _partitionUpsertMetadataManager = null;
       _validDocIds = null;
-      _validDocIndex = null;
     }
   }
 
@@ -470,28 +466,32 @@ public class MutableSegmentImpl implements MutableSegment 
{
   @Override
   public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
       throws IOException {
-    // Update dictionary first
-    updateDictionary(row);
-
-    // If metrics aggregation is enabled and if the dimension values were 
already seen, this will return existing docId,
-    // else this will return a new docId.
-    int docId = getOrCreateDocId();
-
     boolean canTakeMore;
-    if (docId == _numDocsIndexed) {
-      // New row
+    if (isUpsertEnabled()) {
+      row = handleUpsert(row, _numDocsIndexed);
+
+      updateDictionary(row);
       addNewRow(row);
       // Update number of documents indexed at last to make the latest row 
queryable
       canTakeMore = _numDocsIndexed++ < _capacity;
-
-      if (isUpsertEnabled()) {
-        handleUpsert(row, docId);
-      }
     } else {
-      Preconditions.checkArgument(!isUpsertEnabled(), "metrics aggregation 
cannot be used with upsert");
-      assert _aggregateMetrics;
-      aggregateMetrics(row, docId);
-      canTakeMore = true;
+      // Update dictionary first
+      updateDictionary(row);
+
+      // If metrics aggregation is enabled and if the dimension values were 
already seen, this will return existing
+      // docId, else this will return a new docId.
+      int docId = getOrCreateDocId();
+
+      if (docId == _numDocsIndexed) {
+        // New row
+        addNewRow(row);
+        // Update number of documents indexed at last to make the latest row 
queryable
+        canTakeMore = _numDocsIndexed++ < _capacity;
+      } else {
+        assert _aggregateMetrics;
+        aggregateMetrics(row, docId);
+        canTakeMore = true;
+      }
     }
 
     // Update last indexed time and latest ingestion time
@@ -507,14 +507,13 @@ public class MutableSegmentImpl implements MutableSegment 
{
     return _upsertMode != UpsertConfig.Mode.NONE;
   }
 
-  private void handleUpsert(GenericRow row, int docId) {
+  private GenericRow handleUpsert(GenericRow row, int docId) {
     PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
     Object timeValue = row.getValue(_timeColumnName);
     Preconditions.checkArgument(timeValue instanceof Comparable, "time column 
shall be comparable");
     long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue);
-    _partitionUpsertMetadataManager
-        .updateRecord(_segmentName, new 
PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, timestamp),
-            _validDocIds);
+    return _partitionUpsertMetadataManager
+        .updateRecord(this, new 
PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, timestamp), row);
   }
 
   private void updateDictionary(GenericRow row) {
@@ -758,8 +757,8 @@ public class MutableSegmentImpl implements MutableSegment {
 
   @Nullable
   @Override
-  public ValidDocIndexReader getValidDocIndex() {
-    return _validDocIndex;
+  public ThreadSafeMutableRoaringBitmap getValidDocIds() {
+    return _validDocIds;
   }
 
   @Override
@@ -937,6 +936,14 @@ public class MutableSegmentImpl implements MutableSegment {
     return segmentName + ":" + columnName + indexType;
   }
 
+  /**
+   * Helper function that returns docId, depends on the following scenarios.
+   * <ul>
+   *   <li> If metrics aggregation is enabled and if the dimension values were 
already seen, return existing docIds </li>
+   *   <li> Else, this function will create and return a new docId. </li>
+   * </ul>
+   *
+   * */
   private int getOrCreateDocId() {
     if (!_aggregateMetrics) {
       return _numDocsIndexed;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/geospatial/MutableH3Index.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/geospatial/MutableH3Index.java
index b026093..7af31e7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/geospatial/MutableH3Index.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/geospatial/MutableH3Index.java
@@ -22,8 +22,8 @@ import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import 
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.local.utils.H3Utils;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
 import org.apache.pinot.segment.spi.index.reader.H3IndexResolution;
 import org.locationtech.jts.geom.Coordinate;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
index 98056b3..d2d3cf1 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
@@ -21,7 +21,7 @@ package 
org.apache.pinot.segment.local.realtime.impl.invertedindex;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import 
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/nullvalue/MutableNullValueVector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/nullvalue/MutableNullValueVector.java
index b3ff3e2..5558857 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/nullvalue/MutableNullValueVector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/nullvalue/MutableNullValueVector.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.segment.local.realtime.impl.nullvalue;
 
-import 
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ValidDocIndexReaderImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ValidDocIndexReaderImpl.java
index f09ab06..8a2089f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ValidDocIndexReaderImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ValidDocIndexReaderImpl.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
-import 
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
new file mode 100644
index 0000000..4822339
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handler for partial-upsert.
+ */
+public class PartialUpsertHandler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartialUpsertHandler.class);
+
+  // _column2Mergers maintains the mapping of merge strategies per columns.
+  private final Map<String, PartialUpsertMerger> _column2Mergers = new 
HashMap<>();
+
+  private final HelixManager _helixManager;
+  private final String _tableNameWithType;
+  private boolean _allSegmentsLoaded;
+
+  public PartialUpsertHandler(HelixManager helixManager, String 
tableNameWithType,
+      Map<String, UpsertConfig.Strategy> partialUpsertStrategies) {
+    _helixManager = helixManager;
+    _tableNameWithType = tableNameWithType;
+    for (Map.Entry<String, UpsertConfig.Strategy> entry : 
partialUpsertStrategies.entrySet()) {
+      _column2Mergers.put(entry.getKey(), 
PartialUpsertMergerFactory.getMerger(entry.getValue()));
+    }
+  }
+
+  /**
+   * Returns {@code true} if all segments assigned to the current instance are 
loaded, {@code false} otherwise.
+   * Consuming segment should perform this check to ensure all previous 
records are loaded before inserting new records.
+   */
+  public synchronized boolean isAllSegmentsLoaded() {
+    if (_allSegmentsLoaded) {
+      return true;
+    }
+
+    HelixDataAccessor dataAccessor = _helixManager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+    IdealState idealState = 
dataAccessor.getProperty(keyBuilder.idealStates(_tableNameWithType));
+    if (idealState == null) {
+      LOGGER.warn("Failed to find ideal state for table: {}", 
_tableNameWithType);
+      return false;
+    }
+    String instanceName = _helixManager.getInstanceName();
+    LiveInstance liveInstance = 
dataAccessor.getProperty(keyBuilder.liveInstance(instanceName));
+    if (liveInstance == null) {
+      LOGGER.warn("Failed to find live instance for instance: {}", 
instanceName);
+      return false;
+    }
+    String sessionId = liveInstance.getEphemeralOwner();
+    CurrentState currentState =
+        dataAccessor.getProperty(keyBuilder.currentState(instanceName, 
sessionId, _tableNameWithType));
+    if (currentState == null) {
+      LOGGER.warn("Failed to find current state for instance: {}, sessionId: 
{}, table: {}", instanceName, sessionId,
+          _tableNameWithType);
+      return false;
+    }
+
+    // Check if ideal state and current state matches for all segments 
assigned to the current instance
+    Map<String, Map<String, String>> idealStatesMap = 
idealState.getRecord().getMapFields();
+    Map<String, String> currentStateMap = currentState.getPartitionStateMap();
+    for (Map.Entry<String, Map<String, String>> entry : 
idealStatesMap.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> instanceStateMap = entry.getValue();
+      String expectedState = instanceStateMap.get(instanceName);
+      // Only track ONLINE segments assigned to the current instance
+      if (!SegmentStateModel.ONLINE.equals(expectedState)) {
+        continue;
+      }
+      String actualState = currentStateMap.get(segmentName);
+      if (!SegmentStateModel.ONLINE.equals(actualState)) {
+        if (SegmentStateModel.ERROR.equals(actualState)) {
+          LOGGER
+              .error("Find ERROR segment: {}, table: {}, expected: {}", 
segmentName, _tableNameWithType, expectedState);
+        } else {
+          LOGGER.info("Find unloaded segment: {}, table: {}, expected: {}, 
actual: {}", segmentName, _tableNameWithType,
+              expectedState, actualState);
+        }
+        return false;
+      }
+    }
+
+    LOGGER.info("All segments loaded for table: {}", _tableNameWithType);
+    _allSegmentsLoaded = true;
+    return true;
+  }
+
+  /**
+   * Merges 2 records and returns the merged record.
+   *
+   * @param previousRecord the last derived full record during ingestion.
+   * @param newRecord the new consumed record.
+   * @return a new row after merge
+   */
+  public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) {
+    for (Map.Entry<String, PartialUpsertMerger> entry : 
_column2Mergers.entrySet()) {
+      String column = entry.getKey();
+      if (!previousRecord.isNullValue(column)) {
+        if (newRecord.isNullValue(column)) {
+          newRecord.putValue(column, previousRecord.getValue(column));
+          newRecord.removeNullValueField(column);
+        } else {
+          newRecord
+              .putValue(column, 
entry.getValue().merge(previousRecord.getValue(column), 
newRecord.getValue(column)));
+        }
+      }
+    }
+    return newRecord;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 8e02f85..335d658 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -21,11 +21,14 @@ package org.apache.pinot.segment.local.upsert;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import 
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,92 +63,117 @@ public class PartitionUpsertMetadataManager {
   private final String _tableNameWithType;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
+  private final PartialUpsertHandler _partialUpsertHandler;
 
-  public PartitionUpsertMetadataManager(String tableNameWithType, int 
partitionId, ServerMetrics serverMetrics) {
+  // TODO(upsert): consider an off-heap KV store to persist this mapping to 
improve the recovery speed.
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+  // Reused for reading previous record during partial upsert
+  private final GenericRow _reuse = new GenericRow();
+  // Stores the result of updateRecord()
+  private GenericRow _result;
+
+  public PartitionUpsertMetadataManager(String tableNameWithType, int 
partitionId, ServerMetrics serverMetrics,
+      @Nullable PartialUpsertHandler partialUpsertHandler) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _serverMetrics = serverMetrics;
+    _partialUpsertHandler = partialUpsertHandler;
   }
 
-  public ConcurrentHashMap<PrimaryKey, RecordLocation> 
getPrimaryKeyToRecordLocationMap() {
-    return _primaryKeyToRecordLocationMap;
-  }
-
-  // TODO(upset): consider an off-heap KV store to persist this index to 
improve the recovery speed.
-  @VisibleForTesting
-  final ConcurrentHashMap<PrimaryKey, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
-
   /**
-   * Initializes the upsert metadata for the given immutable segment, returns 
the valid doc ids for the segment.
+   * Initializes the upsert metadata for the given immutable segment.
    */
-  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, 
Iterator<RecordInfo> recordInfoIterator) {
+  public void addSegment(IndexSegment segment, Iterator<RecordInfo> 
recordInfoIterator) {
+    String segmentName = segment.getSegmentName();
     LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+    ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
+    assert validDocIds != null;
 
-    ThreadSafeMutableRoaringBitmap validDocIds = new 
ThreadSafeMutableRoaringBitmap();
     while (recordInfoIterator.hasNext()) {
       RecordInfo recordInfo = recordInfoIterator.next();
       _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, 
(primaryKey, currentRecordLocation) -> {
         if (currentRecordLocation != null) {
           // Existing primary key
 
-          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
-            // The current record location has the same segment name
-
-            // Update the record location when the new timestamp is greater 
than or equal to the current timestamp.
-            // There are 2 scenarios:
-            //   1. The current record location is pointing to the same 
segment (the segment being added). In this case,
-            //      we want to update the record location when there is a tie 
to keep the newer record. Note that the
-            //      record info iterator will return records with incremental 
doc ids.
-            //   2. The current record location is pointing to the old segment 
being replaced. This could happen when
-            //      committing a consuming segment, or reloading a completed 
segment. In this case, we want to update
-            //      the record location when there is a tie because the record 
locations should point to the new added
-            //      segment instead of the old segment being replaced. Also, 
do not update the valid doc ids for the old
-            //      segment because it has not been replaced yet.
+          // The current record is in the same segment
+          // Update the record location when there is a tie to keep the newer 
record. Note that the record info iterator
+          // will return records with incremental doc ids.
+          IndexSegment currentSegment = currentRecordLocation.getSegment();
+          if (segment == currentSegment) {
             if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) 
{
-              // Only update the valid doc ids for the new segment
-              if (validDocIds == currentRecordLocation.getValidDocIds()) {
-                validDocIds.remove(currentRecordLocation.getDocId());
-              }
+              validDocIds.remove(currentRecordLocation.getDocId());
               validDocIds.add(recordInfo._docId);
-              return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+              return new RecordLocation(segment, recordInfo._docId, 
recordInfo._timestamp);
             } else {
               return currentRecordLocation;
             }
-          } else {
-            // The current record location is pointing to a different segment
-
-            // Update the record location when getting a newer timestamp, or 
the timestamp is the same as the current
-            // timestamp, but the segment has a larger sequence number (the 
segment is newer than the current segment).
-            if (recordInfo._timestamp > currentRecordLocation.getTimestamp() 
|| (
-                recordInfo._timestamp == currentRecordLocation.getTimestamp()
-                    && 
LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)
-                    && 
LLCSegmentName.isLowLevelConsumerSegmentName(currentRecordLocation.getSegmentName())
-                    && LLCSegmentName.getSequenceNumber(segmentName) > 
LLCSegmentName
-                    
.getSequenceNumber(currentRecordLocation.getSegmentName()))) {
-              
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+          }
+
+          // The current record is in an old segment being replaced
+          // This could happen when committing a consuming segment, or 
reloading a completed segment. In this case, we
+          // want to update the record location when there is a tie because 
the record locations should point to the new
+          // added segment instead of the old segment being replaced. Also, do 
not update the valid doc ids for the old
+          // segment because it has not been replaced yet.
+          String currentSegmentName = currentSegment.getSegmentName();
+          if (segmentName.equals(currentSegmentName)) {
+            if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) 
{
               validDocIds.add(recordInfo._docId);
-              return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+              return new RecordLocation(segment, recordInfo._docId, 
recordInfo._timestamp);
             } else {
               return currentRecordLocation;
             }
           }
+
+          // The current record is in a different segment
+          // Update the record location when getting a newer timestamp, or the 
timestamp is the same as the current
+          // timestamp, but the segment has a larger sequence number (the 
segment is newer than the current segment).
+          if (recordInfo._timestamp > currentRecordLocation.getTimestamp() || (
+              recordInfo._timestamp == currentRecordLocation.getTimestamp() && 
LLCSegmentName
+                  .isLowLevelConsumerSegmentName(segmentName) && LLCSegmentName
+                  .isLowLevelConsumerSegmentName(currentSegmentName)
+                  && LLCSegmentName.getSequenceNumber(segmentName) > 
LLCSegmentName
+                  .getSequenceNumber(currentSegmentName))) {
+            assert currentSegment.getValidDocIds() != null;
+            
currentSegment.getValidDocIds().remove(currentRecordLocation.getDocId());
+            validDocIds.add(recordInfo._docId);
+            return new RecordLocation(segment, recordInfo._docId, 
recordInfo._timestamp);
+          } else {
+            return currentRecordLocation;
+          }
         } else {
           // New primary key
           validDocIds.add(recordInfo._docId);
-          return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+          return new RecordLocation(segment, recordInfo._docId, 
recordInfo._timestamp);
         }
       });
     }
     // Update metrics
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
         _primaryKeyToRecordLocationMap.size());
-    return validDocIds;
   }
 
   /**
-   * Updates the upsert metadata for a new consumed record in the given 
consuming segment.
+   * Updates the upsert metadata for a new consumed record in the given 
consuming segment. Returns the merged record if
+   * partial-upsert is enabled.
    */
-  public void updateRecord(String segmentName, RecordInfo recordInfo, 
ThreadSafeMutableRoaringBitmap validDocIds) {
+  public GenericRow updateRecord(IndexSegment segment, RecordInfo recordInfo, 
GenericRow record) {
+    // For partial-upsert, need to ensure all previous records are loaded 
before inserting new records.
+    if (_partialUpsertHandler != null) {
+      while (!_partialUpsertHandler.isAllSegmentsLoaded()) {
+        LOGGER
+            .info("Sleeping 1 second waiting for all segments loaded for 
partial-upsert table: {}", _tableNameWithType);
+        try {
+          //noinspection BusyWait
+          Thread.sleep(1000L);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    _result = record;
     _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, 
(primaryKey, currentRecordLocation) -> {
       if (currentRecordLocation != null) {
         // Existing primary key
@@ -153,35 +181,52 @@ public class PartitionUpsertMetadataManager {
         // Update the record location when the new timestamp is greater than 
or equal to the current timestamp. Update
         // the record location when there is a tie to keep the newer record.
         if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
-          
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
-          validDocIds.add(recordInfo._docId);
-          return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+          IndexSegment currentSegment = currentRecordLocation.getSegment();
+          if (_partialUpsertHandler != null) {
+            // Partial upsert
+            GenericRow previousRecord = 
currentSegment.getRecord(currentRecordLocation.getDocId(), _reuse);
+            _result = _partialUpsertHandler.merge(previousRecord, record);
+          }
+          assert currentSegment.getValidDocIds() != null;
+          
currentSegment.getValidDocIds().remove(currentRecordLocation.getDocId());
+          assert segment.getValidDocIds() != null;
+          segment.getValidDocIds().add(recordInfo._docId);
+          return new RecordLocation(segment, recordInfo._docId, 
recordInfo._timestamp);
         } else {
+          if (_partialUpsertHandler != null) {
+            LOGGER.warn(
+                "Got late event for partial upsert: {} (current timestamp: {}, 
record timestamp: {}), skipping updating the record",
+                record, currentRecordLocation.getTimestamp(), 
recordInfo._timestamp);
+          }
           return currentRecordLocation;
         }
       } else {
         // New primary key
-        validDocIds.add(recordInfo._docId);
-        return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+        assert segment.getValidDocIds() != null;
+        segment.getValidDocIds().add(recordInfo._docId);
+        return new RecordLocation(segment, recordInfo._docId, 
recordInfo._timestamp);
       }
     });
     // Update metrics
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
         _primaryKeyToRecordLocationMap.size());
+    return _result;
   }
 
   /**
    * Removes the upsert metadata for the given immutable segment. No need to 
remove the upsert metadata for the
    * consuming segment because it should be replaced by the committed segment.
    */
-  public void removeSegment(String segmentName, ThreadSafeMutableRoaringBitmap 
validDocIds) {
+  public void removeSegment(IndexSegment segment) {
+    String segmentName = segment.getSegmentName();
     LOGGER.info("Removing upsert metadata for segment: {}", segmentName);
 
-    if (!validDocIds.getMutableRoaringBitmap().isEmpty()) {
-      // Remove all the record locations that point to the valid doc ids of 
the removed segment.
+    assert segment.getValidDocIds() != null;
+    if (!segment.getValidDocIds().getMutableRoaringBitmap().isEmpty()) {
+      // Remove all the record locations that point to the removed segment
       _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
-        if (recordLocation.getValidDocIds() == validDocIds) {
-          // Check and remove to prevent removing the key that is just updated.
+        if (recordLocation.getSegment() == segment) {
+          // Check and remove to prevent removing the key that is just updated
           _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
         }
       });
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
index b02cfd3..038fb23 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
@@ -18,27 +18,25 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
-import 
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.IndexSegment;
 
 
 /**
  * Indicate a record's location on the local host.
  */
 public class RecordLocation {
-  private final String _segmentName;
+  private final IndexSegment _segment;
   private final int _docId;
   private final long _timestamp;
-  private final ThreadSafeMutableRoaringBitmap _validDocIds;
 
-  public RecordLocation(String segmentName, int docId, long timestamp, 
ThreadSafeMutableRoaringBitmap validDocIds) {
-    _segmentName = segmentName;
+  public RecordLocation(IndexSegment indexSegment, int docId, long timestamp) {
+    _segment = indexSegment;
     _docId = docId;
     _timestamp = timestamp;
-    _validDocIds = validDocIds;
   }
 
-  public String getSegmentName() {
-    return _segmentName;
+  public IndexSegment getSegment() {
+    return _segment;
   }
 
   public int getDocId() {
@@ -48,8 +46,4 @@ public class RecordLocation {
   public long getTimestamp() {
     return _timestamp;
   }
-
-  public ThreadSafeMutableRoaringBitmap getValidDocIds() {
-    return _validDocIds;
-  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index a43a3b2..ae8fb3f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.upsert;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerMetrics;
 
@@ -32,14 +33,17 @@ public class TableUpsertMetadataManager {
   private final Map<Integer, PartitionUpsertMetadataManager> 
_partitionMetadataManagerMap = new ConcurrentHashMap<>();
   private final String _tableNameWithType;
   private final ServerMetrics _serverMetrics;
+  private final PartialUpsertHandler _partialUpsertHandler;
 
-  public TableUpsertMetadataManager(String tableNameWithType, ServerMetrics 
serverMetrics) {
+  public TableUpsertMetadataManager(String tableNameWithType, ServerMetrics 
serverMetrics,
+      @Nullable PartialUpsertHandler partialUpsertHandler) {
     _tableNameWithType = tableNameWithType;
     _serverMetrics = serverMetrics;
+    _partialUpsertHandler = partialUpsertHandler;
   }
 
   public PartitionUpsertMetadataManager getOrCreatePartitionManager(int 
partitionId) {
-    return _partitionMetadataManagerMap
-        .computeIfAbsent(partitionId, k -> new 
PartitionUpsertMetadataManager(_tableNameWithType, k, _serverMetrics));
+    return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
+        k -> new PartitionUpsertMetadataManager(_tableNameWithType, k, 
_serverMetrics, _partialUpsertHandler));
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ValidDocIndexReaderImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
similarity index 54%
copy from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ValidDocIndexReaderImpl.java
copy to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
index f09ab06..6d38ae2 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ValidDocIndexReaderImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
@@ -16,22 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.segment.local.segment.index.readers;
+package org.apache.pinot.segment.local.upsert.merger;
 
-import 
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
-import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
-
-
-public class ValidDocIndexReaderImpl implements ValidDocIndexReader {
-  private final ThreadSafeMutableRoaringBitmap _validDocBitmap;
-
-  public ValidDocIndexReaderImpl(ThreadSafeMutableRoaringBitmap 
validDocBitmap) {
-    _validDocBitmap = validDocBitmap;
+public class IncrementMerger implements PartialUpsertMerger {
+  IncrementMerger() {
   }
 
+  /**
+   * Increment the new value from incoming row to the given field of previous 
record.
+   */
   @Override
-  public ImmutableRoaringBitmap getValidDocBitmap() {
-    return _validDocBitmap.getMutableRoaringBitmap();
+  public Object merge(Object previousValue, Object currentValue) {
+    return addNumbers((Number) previousValue, (Number) currentValue);
+  }
+
+  private static Number addNumbers(Number a, Number b) {
+    if (a instanceof Integer) {
+      return (Integer) a + (Integer) b;
+    } else if (a instanceof Long) {
+      return (Long) a + (Long) b;
+    } else if (a instanceof Float) {
+      return (Float) a + (Float) b;
+    } else {
+      return (Double) a + (Double) b;
+    }
   }
 }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java
similarity index 59%
copy from 
pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
copy to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java
index 9664a8e..a317d45 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java
@@ -16,27 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.config.table;
+package org.apache.pinot.segment.local.upsert.merger;
 
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
-
-
-public class UpsertConfigTest {
-
-  @Test
-  public void testUpsertConfig() {
-    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
-    assertEquals(upsertConfig.getMode(), UpsertConfig.Mode.FULL);
+public class OverwriteMerger implements PartialUpsertMerger {
+  OverwriteMerger() {
+  }
 
-    // Test illegal arguments
-    try {
-      new UpsertConfig(UpsertConfig.Mode.PARTIAL);
-      fail();
-    } catch (IllegalArgumentException e) {
-      // Expected
-    }
+  @Override
+  public Object merge(Object previousValue, Object currentValue) {
+    return currentValue;
   }
 }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java
similarity index 59%
copy from 
pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
copy to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java
index 9664a8e..817d953 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java
@@ -16,27 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.config.table;
+package org.apache.pinot.segment.local.upsert.merger;
 
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
-
-
-public class UpsertConfigTest {
-
-  @Test
-  public void testUpsertConfig() {
-    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
-    assertEquals(upsertConfig.getMode(), UpsertConfig.Mode.FULL);
-
-    // Test illegal arguments
-    try {
-      new UpsertConfig(UpsertConfig.Mode.PARTIAL);
-      fail();
-    } catch (IllegalArgumentException e) {
-      // Expected
-    }
-  }
+public interface PartialUpsertMerger {
+  /**
+   * Handle partial upsert merge.
+   *
+   * @param previousValue the value of given field from the last derived full 
record during ingestion.
+   * @param currentValue the value of given field from the new consumed record.
+   * @return a new value after merge
+   */
+  Object merge(Object previousValue, Object currentValue);
 }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
similarity index 54%
copy from 
pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
copy to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
index 9664a8e..36112c4 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
@@ -16,27 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.config.table;
+package org.apache.pinot.segment.local.upsert.merger;
 
-import org.testng.annotations.Test;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
 
+public class PartialUpsertMergerFactory {
+  private PartialUpsertMergerFactory() {
+  }
 
-public class UpsertConfigTest {
-
-  @Test
-  public void testUpsertConfig() {
-    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
-    assertEquals(upsertConfig.getMode(), UpsertConfig.Mode.FULL);
+  private static final OverwriteMerger OVERWRITE_MERGER = new 
OverwriteMerger();
+  private static final IncrementMerger INCREMENT_MERGER = new 
IncrementMerger();
 
-    // Test illegal arguments
-    try {
-      new UpsertConfig(UpsertConfig.Mode.PARTIAL);
-      fail();
-    } catch (IllegalArgumentException e) {
-      // Expected
+  public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) {
+    switch (strategy) {
+      case OVERWRITE:
+        return OVERWRITE_MERGER;
+      case INCREMENT:
+        return INCREMENT_MERGER;
+      default:
+        throw new IllegalStateException("Unsupported partial upsert strategy: 
" + strategy);
     }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 65cf5f9..ce88c15 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -103,6 +103,7 @@ public final class TableConfigUtils {
     validateIndexingConfig(tableConfig.getIndexingConfig(), schema);
     validateFieldConfigList(tableConfig.getFieldConfigList(), 
tableConfig.getIndexingConfig(), schema);
     validateUpsertConfig(tableConfig, schema);
+    validatePartialUpsertStrategies(tableConfig, schema);
     validateTaskConfigs(tableConfig);
   }
 
@@ -324,7 +325,7 @@ public final class TableConfigUtils {
    *  - consumer type must be low-level
    */
   @VisibleForTesting
-  public static void validateUpsertConfig(TableConfig tableConfig, Schema 
schema) {
+  static void validateUpsertConfig(TableConfig tableConfig, Schema schema) {
     if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE) {
       return;
     }
@@ -351,6 +352,43 @@ public final class TableConfigUtils {
   }
 
   /**
+   * Validates the partial upsert-related configurations:
+   *  - Null handling must be enabled
+   *  - Merger cannot be applied to private key columns
+   *  - Merger cannot be applied to non-existing columns
+   *  - INCREMENT merger must be applied to numeric columns
+   */
+  @VisibleForTesting
+  static void validatePartialUpsertStrategies(TableConfig tableConfig, Schema 
schema) {
+    if (tableConfig.getUpsertMode() != UpsertConfig.Mode.PARTIAL) {
+      return;
+    }
+
+    
Preconditions.checkState(tableConfig.getIndexingConfig().isNullHandlingEnabled(),
+        "Null handling must be enabled for partial upsert tables");
+
+    UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+    assert upsertConfig != null;
+    Map<String, UpsertConfig.Strategy> partialUpsertStrategies = 
upsertConfig.getPartialUpsertStrategies();
+
+    List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
+    for (Map.Entry<String, UpsertConfig.Strategy> entry : 
partialUpsertStrategies.entrySet()) {
+      String column = entry.getKey();
+      Preconditions.checkState(!primaryKeyColumns.contains(column), "Merger 
cannot be applied to primary key columns");
+
+      FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+      Preconditions.checkState(fieldSpec != null, "Merger cannot be applied to 
non-existing column: %s", column);
+
+      if (entry.getValue() == UpsertConfig.Strategy.INCREMENT) {
+        
Preconditions.checkState(fieldSpec.getDataType().getStoredType().isNumeric(),
+            "INCREMENT merger cannot be applied to non-numeric column: %s", 
column);
+        Preconditions.checkState(!schema.getDateTimeNames().contains(column),
+            "INCREMENT merger cannot be applied to date time column: %s", 
column);
+      }
+    }
+  }
+
+  /**
    * Validates the tier configs
    * Checks for the right segmentSelectorType and its required properties
    * Checks for the right storageType and its required properties
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index 2e5e6b3..1372cfa 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -61,7 +61,7 @@ public class MutableSegmentImplUpsertTest {
     _recordTransformer = 
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
     _partitionUpsertMetadataManager =
-        new TableUpsertMetadataManager("testTable_REALTIME", 
Mockito.mock(ServerMetrics.class))
+        new TableUpsertMetadataManager("testTable_REALTIME", 
Mockito.mock(ServerMetrics.class), null)
             .getOrCreatePartitionManager(0);
     _mutableSegmentImpl = MutableSegmentImplTestUtils
         .createMutableSegmentImpl(_schema, Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet(),
@@ -81,7 +81,7 @@ public class MutableSegmentImplUpsertTest {
 
   @Test
   public void testUpsertIngestion() {
-    ImmutableRoaringBitmap bitmap = 
_mutableSegmentImpl.getValidDocIndex().getValidDocBitmap();
+    ImmutableRoaringBitmap bitmap = 
_mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap();
     Assert.assertFalse(bitmap.contains(0));
     Assert.assertTrue(bitmap.contains(1));
     Assert.assertTrue(bitmap.contains(2));
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
new file mode 100644
index 0000000..5335d51
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class PartialUpsertHandlerTest {
+
+  @Test
+  public void testMerge() {
+    HelixManager helixManager = Mockito.mock(HelixManager.class);
+    String realtimeTableName = "testTable_REALTIME";
+    Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new 
HashMap<>();
+    partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT);
+    PartialUpsertHandler handler = new PartialUpsertHandler(helixManager, 
realtimeTableName, partialUpsertStrategies);
+
+    // both records are null.
+    GenericRow previousRecord = new GenericRow();
+    GenericRow incomingRecord = new GenericRow();
+
+    previousRecord.putDefaultNullValue("field1", 1);
+    incomingRecord.putDefaultNullValue("field1", 2);
+    GenericRow newRecord = handler.merge(previousRecord, incomingRecord);
+    assertTrue(newRecord.isNullValue("field1"));
+    assertEquals(newRecord.getValue("field1"), 2);
+
+    // previousRecord is null default value, while newRecord is not.
+    previousRecord.clear();
+    incomingRecord.clear();
+    previousRecord.putDefaultNullValue("field1", 1);
+    incomingRecord.putValue("field1", 2);
+    newRecord = handler.merge(previousRecord, incomingRecord);
+    assertFalse(newRecord.isNullValue("field1"));
+    assertEquals(newRecord.getValue("field1"), 2);
+
+    // newRecord is default null value, while previousRecord is not.
+    previousRecord.clear();
+    incomingRecord.clear();
+    previousRecord.putValue("field1", 1);
+    incomingRecord.putDefaultNullValue("field1", 2);
+    newRecord = handler.merge(previousRecord, incomingRecord);
+    assertFalse(newRecord.isNullValue("field1"));
+    assertEquals(newRecord.getValue("field1"), 1);
+
+    // neither of records is null.
+    previousRecord.clear();
+    incomingRecord.clear();
+    previousRecord.putValue("field1", 1);
+    incomingRecord.putValue("field1", 2);
+    newRecord = handler.merge(previousRecord, incomingRecord);
+    assertFalse(newRecord.isNullValue("field1"));
+    assertEquals(newRecord.getValue("field1"), 3);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
similarity index 76%
rename from 
pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
rename to 
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
index 65a45ed..eefa175 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
@@ -16,21 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.upsert;
+package org.apache.pinot.segment.local.upsert;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import 
org.apache.pinot.segment.local.realtime.impl.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
-import org.apache.pinot.segment.local.upsert.RecordLocation;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
@@ -44,11 +46,10 @@ public class PartitionUpsertMetadataManagerTest {
   @Test
   public void testAddSegment() {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
Mockito.mock(ServerMetrics.class));
-    Map<PrimaryKey, RecordLocation> recordLocationMap = 
upsertMetadataManager.getPrimaryKeyToRecordLocationMap();
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
mock(ServerMetrics.class), null);
+    Map<PrimaryKey, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
-    String segment1 = getSegmentName(1);
     List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new 
ArrayList<>();
     recordInfoList1.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList1.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 100));
@@ -56,8 +57,9 @@ public class PartitionUpsertMetadataManagerTest {
     recordInfoList1.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 80));
     recordInfoList1.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 4, 120));
     recordInfoList1.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 5, 100));
-    ThreadSafeMutableRoaringBitmap validDocIds1 =
-        upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
+    upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
     // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
     checkRecordLocation(recordLocationMap, 0, segment1, 5, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
@@ -65,15 +67,15 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4, 5});
 
     // Add the second segment
-    String segment2 = getSegmentName(2);
     ArrayList<PartitionUpsertMetadataManager.RecordInfo> recordInfoList2 = new 
ArrayList<>();
     recordInfoList2.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList2.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 100));
     recordInfoList2.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, 120));
     recordInfoList2.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 3, 80));
     recordInfoList2.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 4, 80));
-    ThreadSafeMutableRoaringBitmap validDocIds2 =
-        upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment2 = mockSegment(2, validDocIds2);
+    upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
     // segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
@@ -84,31 +86,40 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
 
     // Replace (reload) the first segment
-    ThreadSafeMutableRoaringBitmap newValidDocIds1 =
-        upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
+    ThreadSafeMutableRoaringBitmap newValidDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl newSegment1 = mockSegment(1, newValidDocIds1);
+    upsertMetadataManager.addSegment(newSegment1, recordInfoList1.iterator());
     // original segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
-    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
     checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
-    assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(), 
newValidDocIds1);
+    assertSame(recordLocationMap.get(getPrimaryKey(1)).getSegment(), 
newSegment1);
 
     // Remove the original segment1
-    upsertMetadataManager.removeSegment(segment1, validDocIds1);
+    upsertMetadataManager.removeSegment(segment1);
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
-    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
     checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
-    assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(), 
newValidDocIds1);
+    assertSame(recordLocationMap.get(getPrimaryKey(1)).getSegment(), 
newSegment1);
+  }
+
+  private static ImmutableSegmentImpl mockSegment(int sequenceNumber, 
ThreadSafeMutableRoaringBitmap validDocIds) {
+    ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
+    String segmentName = getSegmentName(sequenceNumber);
+    when(segment.getSegmentName()).thenReturn(segmentName);
+    when(segment.getValidDocIds()).thenReturn(validDocIds);
+    return segment;
   }
 
   private static String getSegmentName(int sequenceNumber) {
@@ -120,10 +131,10 @@ public class PartitionUpsertMetadataManagerTest {
   }
 
   private static void checkRecordLocation(Map<PrimaryKey, RecordLocation> 
recordLocationMap, int keyValue,
-      String segmentName, int docId, long timestamp) {
+      IndexSegment segment, int docId, long timestamp) {
     RecordLocation recordLocation = 
recordLocationMap.get(getPrimaryKey(keyValue));
     assertNotNull(recordLocation);
-    assertEquals(recordLocation.getSegmentName(), segmentName);
+    assertSame(recordLocation.getSegment(), segment);
     assertEquals(recordLocation.getDocId(), docId);
     assertEquals(recordLocation.getTimestamp(), timestamp);
   }
@@ -131,25 +142,26 @@ public class PartitionUpsertMetadataManagerTest {
   @Test
   public void testUpdateRecord() {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
Mockito.mock(ServerMetrics.class));
-    Map<PrimaryKey, RecordLocation> recordLocationMap = 
upsertMetadataManager.getPrimaryKeyToRecordLocationMap();
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
mock(ServerMetrics.class), null);
+    Map<PrimaryKey, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
-    String segment1 = getSegmentName(1);
     List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new 
ArrayList<>();
     recordInfoList1.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList1.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 120));
     recordInfoList1.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, 100));
-    ThreadSafeMutableRoaringBitmap validDocIds1 =
-        upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
+    upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
 
     // Update records from the second segment
-    String segment2 = getSegmentName(2);
     ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    IndexSegment segment2 = mockSegment(1, validDocIds2);
 
+    GenericRow row = mock(GenericRow.class);
     upsertMetadataManager
-        .updateRecord(segment2, new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 100), 
validDocIds2);
+        .updateRecord(segment2, new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 100), row);
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
     // segment2: 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
@@ -160,7 +172,7 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0});
 
     upsertMetadataManager
-        .updateRecord(segment2, new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 120), 
validDocIds2);
+        .updateRecord(segment2, new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 120), row);
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
@@ -171,7 +183,7 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
 
     upsertMetadataManager
-        .updateRecord(segment2, new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 100), 
validDocIds2);
+        .updateRecord(segment2, new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 100), row);
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
@@ -182,7 +194,7 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
 
     upsertMetadataManager
-        .updateRecord(segment2, new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 100), 
validDocIds2);
+        .updateRecord(segment2, new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 100), row);
     // segment1: 1 -> {1, 120}
     // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment2, 3, 100);
@@ -196,27 +208,27 @@ public class PartitionUpsertMetadataManagerTest {
   @Test
   public void testRemoveSegment() {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
Mockito.mock(ServerMetrics.class));
-    Map<PrimaryKey, RecordLocation> recordLocationMap = 
upsertMetadataManager.getPrimaryKeyToRecordLocationMap();
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
mock(ServerMetrics.class), null);
+    Map<PrimaryKey, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add 2 segments
     // segment1: 0 -> {0, 100}, 1 -> {1, 100}
     // segment2: 2 -> {0, 100}, 3 -> {0, 100}
-    String segment1 = getSegmentName(1);
     List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new 
ArrayList<>();
     recordInfoList1.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList1.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 100));
-    ThreadSafeMutableRoaringBitmap validDocIds1 =
-        upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
-    String segment2 = getSegmentName(2);
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
+    upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
     List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList2 = new 
ArrayList<>();
     recordInfoList2.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 0, 100));
     recordInfoList2.add(new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 1, 100));
-    ThreadSafeMutableRoaringBitmap validDocIds2 =
-        upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment2 = mockSegment(2, validDocIds2);
+    upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
 
     // Remove the first segment
-    upsertMetadataManager.removeSegment(segment1, validDocIds1);
+    upsertMetadataManager.removeSegment(segment1);
     // segment2: 2 -> {0, 100}, 3 -> {0, 100}
     assertNull(recordLocationMap.get(getPrimaryKey(0)));
     assertNull(recordLocationMap.get(getPrimaryKey(1)));
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
similarity index 90%
rename from 
pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
rename to 
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index b3620a9..6d76c88 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.util;
+package org.apache.pinot.segment.local.utils;
 
 import com.google.common.collect.Lists;
 import java.util.Arrays;
@@ -25,8 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.tier.TierFactory;
-import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
-import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import 
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
@@ -243,7 +241,8 @@ public class TableConfigUtilsTest {
 
     // valid filterFunction
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig("Groovy({x == 10}, x)"), null, null)).build();
+        .setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig("Groovy({x == 10}, x)"), null, null))
+        .build();
     TableConfigUtils.validate(tableConfig, schema);
 
     // invalid filter function
@@ -272,8 +271,8 @@ public class TableConfigUtilsTest {
 
     // transformed column not in schema
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(anotherCol)")), null))
-        .build();
+        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(anotherCol)")),
+            null)).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for transformedColumn not present in schema");
@@ -286,8 +285,8 @@ public class TableConfigUtilsTest {
             .build();
     // valid transform configs
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(anotherCol)")), null))
-        .build();
+        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(anotherCol)")),
+            null)).build();
     TableConfigUtils.validate(tableConfig, schema);
 
     schema =
@@ -301,8 +300,8 @@ public class TableConfigUtilsTest {
 
     // null transform column name
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig(null, "reverse(anotherCol)")), null))
-        .build();
+        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig(null, "reverse(anotherCol)")),
+            null)).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for null column name in transform config");
@@ -322,8 +321,8 @@ public class TableConfigUtilsTest {
 
     // invalid function
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "fakeFunction(col)")), null))
-        .build();
+        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "fakeFunction(col)")),
+            null)).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for invalid transform function in transform 
config");
@@ -333,8 +332,8 @@ public class TableConfigUtilsTest {
 
     // invalid function
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "Groovy(badExpr)")), null))
-        .build();
+        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "Groovy(badExpr)")),
+            null)).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for invalid transform function in transform 
config");
@@ -367,8 +366,8 @@ public class TableConfigUtilsTest {
     // duplicate transform config
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null, null, null,
-            Lists.newArrayList(new TransformConfig("myCol", "reverse(x)"), new 
TransformConfig("myCol", "lower(y)")), null))
-        .build();
+            Lists.newArrayList(new TransformConfig("myCol", "reverse(x)"), new 
TransformConfig("myCol", "lower(y)")),
+            null)).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail due to duplicate transform config");
@@ -385,9 +384,10 @@ public class TableConfigUtilsTest {
 
   @Test
   public void ingestionStreamConfigsTest() {
-    Map<String, String> fakeMap = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
+    Map<String, String> streamConfigs = getStreamConfigs();
     IngestionConfig ingestionConfig =
-        new IngestionConfig(null, new 
StreamIngestionConfig(Lists.newArrayList(fakeMap, fakeMap)), null, null, null);
+        new IngestionConfig(null, new 
StreamIngestionConfig(Lists.newArrayList(streamConfigs, streamConfigs)), null,
+            null, null);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
             .setIngestionConfig(ingestionConfig).build();
@@ -401,15 +401,16 @@ public class TableConfigUtilsTest {
     }
 
     // stream config should be valid
-    ingestionConfig = new IngestionConfig(null, new 
StreamIngestionConfig(Lists.newArrayList(fakeMap)), null, null, null);
+    ingestionConfig =
+        new IngestionConfig(null, new 
StreamIngestionConfig(Lists.newArrayList(streamConfigs)), null, null, null);
     tableConfig.setIngestionConfig(ingestionConfig);
     TableConfigUtils.validateIngestionConfig(tableConfig, null);
 
-    fakeMap.remove(StreamConfigProperties.STREAM_TYPE);
+    streamConfigs.remove(StreamConfigProperties.STREAM_TYPE);
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, null);
       Assert.fail("Should fail for invalid stream configs map");
-    } catch (Exception e) {
+    } catch (IllegalStateException e) {
       // expected
     }
   }
@@ -724,7 +725,8 @@ public class TableConfigUtilsTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     try {
       FieldConfig fieldConfig =
-          new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, null, 
FieldConfig.CompressionCodec.SNAPPY, null);
+          new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, null, 
FieldConfig.CompressionCodec.SNAPPY,
+              null);
       tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail since dictionary encoding does not support 
compression codec snappy");
@@ -735,7 +737,8 @@ public class TableConfigUtilsTest {
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     try {
       FieldConfig fieldConfig =
-          new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, null, 
FieldConfig.CompressionCodec.ZSTANDARD, null);
+          new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, null, 
FieldConfig.CompressionCodec.ZSTANDARD,
+              null);
       tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail since dictionary encoding does not support 
compression codec zstandard");
@@ -899,8 +902,8 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    starTreeIndexConfig =
-        new StarTreeIndexConfig(Arrays.asList("multiValCol"), 
Arrays.asList("multiValCol"), Arrays.asList("SUM__multiValCol"), 1);
+    starTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("multiValCol"), Arrays.asList("multiValCol"),
+        Arrays.asList("SUM__multiValCol"), 1);
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig)).build();
     try {
@@ -948,8 +951,9 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setJsonIndexColumns(Arrays.asList("intCol")).build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setJsonIndexColumns(Arrays.asList("intCol"))
+            .build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for Json index defined on non string column");
@@ -966,8 +970,8 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setRangeIndexColumns(columnList).
-        build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setRangeIndexColumns(columnList).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
     } catch (Exception e) {
@@ -975,8 +979,7 @@ public class TableConfigUtilsTest {
     }
 
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setRangeIndexColumns(columnList)
-        .setNoDictionaryColumns(columnList).
-            build();
+        .setNoDictionaryColumns(columnList).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for range index defined on non 
numeric/no-dictionary column");
@@ -984,8 +987,8 @@ public class TableConfigUtilsTest {
       // Expected
     }
 
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setVarLengthDictionaryColumns(Arrays.asList("intCol")).
-        build();
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setVarLengthDictionaryColumns(Arrays.asList("intCol")).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for Var length dictionary defined for non 
string/bytes column");
@@ -993,8 +996,8 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setJsonIndexColumns(Arrays.asList("multiValCol")).
-        build();
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setJsonIndexColumns(Arrays.asList("multiValCol")).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for Json Index defined on a multi value 
column");
@@ -1036,56 +1039,58 @@ public class TableConfigUtilsTest {
         .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
-    } catch (Exception e) {
+      Assert.fail();
+    } catch (IllegalStateException e) {
       Assert.assertEquals(e.getMessage(), "Upsert table is for realtime table 
only.");
     }
+
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
         .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
-    } catch (Exception e) {
+      Assert.fail();
+    } catch (IllegalStateException e) {
       Assert.assertEquals(e.getMessage(), "Upsert table must have primary key 
columns in the schema");
     }
+
     schema =
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
 FieldSpec.DataType.STRING)
             .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
-    } catch (Exception e) {
+      Assert.fail();
+    } catch (IllegalStateException e) {
       Assert
           .assertEquals(e.getMessage(), "Could not find streamConfigs for 
REALTIME table: " + TABLE_NAME + "_REALTIME");
     }
-    Map<String, String> streamConfigs = new HashMap<>();
-    streamConfigs.put("stream.kafka.consumer.type", "highLevel");
-    streamConfigs.put("streamType", "kafka");
-    streamConfigs.put("stream.kafka.topic.name", "test");
-    streamConfigs
-        .put("stream.kafka.decoder.class.name", 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+
+    Map<String, String> streamConfigs = getStreamConfigs();
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
         .setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
-    } catch (Exception e) {
+      Assert.fail();
+    } catch (IllegalStateException e) {
       Assert.assertEquals(e.getMessage(), "Upsert table must use low-level 
streaming consumer type");
     }
+
     streamConfigs.put("stream.kafka.consumer.type", "simple");
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
         .setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
-    } catch (Exception e) {
+      Assert.fail();
+    } catch (IllegalStateException e) {
       Assert.assertEquals(e.getMessage(),
           "Upsert table must use strict replica-group (i.e. 
strictReplicaGroup) based routing");
     }
+
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
         .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
         .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setStreamConfigs(streamConfigs).build();
-    try {
-      TableConfigUtils.validateUpsertConfig(tableConfig, schema);
-    } catch (Exception e) {
-      Assert.fail("Should not fail upsert validation");
-    }
+    TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+
     StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Lists.newArrayList("myCol"), null, Collections
         .singletonList(new 
AggregationFunctionColumnPair(AggregationFunctionType.COUNT, 
"myCol").toColumnName()), 10);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
@@ -1094,8 +1099,80 @@ public class TableConfigUtilsTest {
         
.setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
-    } catch (Exception e) {
+      Assert.fail();
+    } catch (IllegalStateException e) {
       Assert.assertEquals(e.getMessage(), "The upsert table cannot have 
star-tree index.");
     }
   }
+
+  @Test
+  public void testValidatePartialUpsertConfig() {
+    Schema schema =
+        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol1",
 FieldSpec.DataType.LONG)
+            .addSingleValueDimension("myCol2", FieldSpec.DataType.STRING)
+            .addDateTime("myTimeCol", FieldSpec.DataType.LONG, "1:DAYS:EPOCH", 
"1:DAYS")
+            .setPrimaryKeyColumns(Lists.newArrayList("myCol1")).build();
+
+    Map<String, String> streamConfigs = getStreamConfigs();
+    streamConfigs.put("stream.kafka.consumer.type", "simple");
+    Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new 
HashMap<>();
+    partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
+
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.PARTIAL, 
partialUpsertStratgies))
+        .setNullHandlingEnabled(false)
+        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setStreamConfigs(streamConfigs).build();
+    try {
+      TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(), "Null handling must be enabled for 
partial upsert tables");
+    }
+
+    tableConfig.getIndexingConfig().setNullHandlingEnabled(true);
+    try {
+      TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(), "Merger cannot be applied to primary 
key columns");
+    }
+
+    partialUpsertStratgies.clear();
+    partialUpsertStratgies.put("randomCol", UpsertConfig.Strategy.OVERWRITE);
+    try {
+      TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(), "Merger cannot be applied to 
non-existing column: randomCol");
+    }
+
+    partialUpsertStratgies.clear();
+    partialUpsertStratgies.put("myCol2", UpsertConfig.Strategy.INCREMENT);
+    try {
+      TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(), "INCREMENT merger cannot be applied 
to non-numeric column: myCol2");
+    }
+
+    partialUpsertStratgies.clear();
+    partialUpsertStratgies.put("myTimeCol", UpsertConfig.Strategy.INCREMENT);
+    try {
+      TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
+      Assert.fail();
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "INCREMENT merger cannot be applied 
to date time column: myTimeCol");
+    }
+  }
+
+  private Map<String, String> getStreamConfigs() {
+    Map<String, String> streamConfigs = new HashMap<>();
+    streamConfigs.put("streamType", "kafka");
+    streamConfigs.put("stream.kafka.consumer.type", "highLevel");
+    streamConfigs.put("stream.kafka.topic.name", "test");
+    streamConfigs
+        .put("stream.kafka.decoder.class.name", 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+    return streamConfigs;
+  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
index 09ad60f..da2517d 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
@@ -22,7 +22,7 @@ import java.util.List;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.segment.spi.index.reader.ValidDocIndexReader;
+import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -74,7 +74,7 @@ public interface IndexSegment {
 
   // TODO(upsert): solve the coordination problems of getting validDoc across 
segments for result consistency
   @Nullable
-  ValidDocIndexReader getValidDocIndex();
+  ThreadSafeMutableRoaringBitmap getValidDocIds();
 
   /**
    * Returns the record for the given document Id. Virtual column values are 
not returned.
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/ThreadSafeMutableRoaringBitmap.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ThreadSafeMutableRoaringBitmap.java
similarity index 96%
rename from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/ThreadSafeMutableRoaringBitmap.java
rename to 
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ThreadSafeMutableRoaringBitmap.java
index 6875e73..3e008fb 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/ThreadSafeMutableRoaringBitmap.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ThreadSafeMutableRoaringBitmap.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.segment.local.realtime.impl;
+package org.apache.pinot.segment.spi.index;
 
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 4c138a7..cf3e841 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -21,6 +21,8 @@ package org.apache.pinot.spi.config.table;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.pinot.spi.config.BaseJsonConfig;
 
 
@@ -30,16 +32,38 @@ public class UpsertConfig extends BaseJsonConfig {
     FULL, PARTIAL, NONE
   }
 
+  public enum Strategy {
+    // Todo: add APPEND, CUSTOM strategies
+    OVERWRITE, INCREMENT
+  }
+
   private final Mode _mode;
+  private final Map<String, Strategy> _partialUpsertStrategies;
 
-  @JsonCreator
   public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode 
mode) {
     Preconditions.checkArgument(mode != null, "Upsert mode must be 
configured");
-    Preconditions.checkArgument(mode != Mode.PARTIAL, "Partial upsert mode is 
not supported");
     _mode = mode;
+    _partialUpsertStrategies = null;
+  }
+
+  @JsonCreator
+  public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode,
+      @JsonProperty(value = "partialUpsertStrategies") Map<String, Strategy> 
partialUpsertStrategies) {
+    Preconditions.checkArgument(mode != null, "Upsert mode must be 
configured");
+    _mode = mode;
+
+    if (mode == Mode.PARTIAL) {
+      _partialUpsertStrategies = partialUpsertStrategies != null ? 
partialUpsertStrategies : new HashMap<>();
+    } else {
+      _partialUpsertStrategies = null;
+    }
   }
 
   public Mode getMode() {
     return _mode;
   }
+
+  public Map<String, Strategy> getPartialUpsertStrategies() {
+    return _partialUpsertStrategies;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index b5b9eb2..a496cf2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -204,6 +204,13 @@ public class GenericRow implements Serializable {
   }
 
   /**
+   * Marks a field as {@code non-null}.
+   */
+  public void removeNullValueField(String fieldName) {
+    _nullValueFields.remove(fieldName);
+  }
+
+  /**
    * Removes all the fields from the row.
    */
   public void clear() {
@@ -223,7 +230,8 @@ public class GenericRow implements Serializable {
     }
     if (obj instanceof GenericRow) {
       GenericRow that = (GenericRow) obj;
-      return _nullValueFields.equals(that._nullValueFields) && 
EqualityUtils.isEqual(_fieldToValueMap, that._fieldToValueMap);
+      return _nullValueFields.equals(that._nullValueFields) && EqualityUtils
+          .isEqual(_fieldToValueMap, that._fieldToValueMap);
     }
     return false;
   }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
index 9664a8e..2b20849 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
@@ -18,25 +18,23 @@
  */
 package org.apache.pinot.spi.config.table;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
 
 
 public class UpsertConfigTest {
 
   @Test
   public void testUpsertConfig() {
-    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
-    assertEquals(upsertConfig.getMode(), UpsertConfig.Mode.FULL);
+    UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL);
+    assertEquals(upsertConfig1.getMode(), UpsertConfig.Mode.FULL);
 
-    // Test illegal arguments
-    try {
-      new UpsertConfig(UpsertConfig.Mode.PARTIAL);
-      fail();
-    } catch (IllegalArgumentException e) {
-      // Expected
-    }
+    Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new 
HashMap<>();
+    partialUpsertStratgies.put("myCol", UpsertConfig.Strategy.INCREMENT);
+    UpsertConfig upsertConfig2 = new UpsertConfig(UpsertConfig.Mode.PARTIAL, 
partialUpsertStratgies);
+    assertEquals(upsertConfig2.getPartialUpsertStrategies(), 
partialUpsertStratgies);
   }
 }
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 323de82..51c8082 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -436,6 +436,17 @@
               </jvmSettings>
             </program>
             <program>
+              
<mainClass>org.apache.pinot.tools.PartialUpsertQuickStart</mainClass>
+              <name>quick-start-partial-upsert-streaming</name>
+              <jvmSettings>
+                <initialMemorySize>1G</initialMemorySize>
+                <maxMemorySize>1G</maxMemorySize>
+                <extraArguments>
+                  
<extraArgument>-Dlog4j2.configurationFile=conf/quickstart-log4j2.xml</extraArgument>
+                </extraArguments>
+              </jvmSettings>
+            </program>
+            <program>
               <mainClass>org.apache.pinot.tools.JsonIndexQuickStart</mainClass>
               <name>quick-start-json-index-batch</name>
               <jvmSettings>
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
new file mode 100644
index 0000000..b77cd5d
--- /dev/null
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.net.URL;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.tools.Quickstart.Color;
+import org.apache.pinot.tools.admin.command.QuickstartRunner;
+import org.apache.pinot.tools.streams.MeetupRsvpStream;
+import org.apache.pinot.tools.utils.KafkaStarterUtils;
+
+import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
+import static org.apache.pinot.tools.Quickstart.printStatus;
+
+
+public class PartialUpsertQuickStart {
+  private StreamDataServerStartable _kafkaStarter;
+
+  public static void main(String[] args)
+      throws Exception {
+    PluginManager.get().init();
+    new PartialUpsertQuickStart().execute();
+  }
+
+  // Todo: add a quick start demo
+  public void execute()
+      throws Exception {
+    File quickstartTmpDir = new File(FileUtils.getTempDirectory(), 
String.valueOf(System.currentTimeMillis()));
+    File bootstrapTableDir = new File(quickstartTmpDir, "meetupRsvp");
+    File dataDir = new File(bootstrapTableDir, "data");
+    Preconditions.checkState(dataDir.mkdirs());
+
+    File schemaFile = new File(bootstrapTableDir, "meetupRsvp_schema.json");
+    File tableConfigFile = new File(bootstrapTableDir, 
"meetupRsvp_realtime_table_config.json");
+
+    ClassLoader classLoader = Quickstart.class.getClassLoader();
+    URL resource = 
classLoader.getResource("examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json");
+    Preconditions.checkNotNull(resource);
+    FileUtils.copyURLToFile(resource, schemaFile);
+    resource =
+        
classLoader.getResource("examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json");
+    Preconditions.checkNotNull(resource);
+    FileUtils.copyURLToFile(resource, tableConfigFile);
+
+    QuickstartTableRequest request = new 
QuickstartTableRequest(bootstrapTableDir.getAbsolutePath());
+    final QuickstartRunner runner = new 
QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir);
+
+    printStatus(Color.CYAN, "***** Starting Kafka *****");
+    final ZkStarter.ZookeeperInstance zookeeperInstance = 
ZkStarter.startLocalZkServer();
+    try {
+      _kafkaStarter = 
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
+          KafkaStarterUtils.getDefaultKafkaConfiguration());
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + 
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
+    }
+    _kafkaStarter.start();
+    _kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(2));
+    printStatus(Color.CYAN, "***** Starting  meetup data stream and publishing 
to Kafka *****");
+    MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true);
+    meetupRSVPProvider.run();
+    printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and 
broker *****");
+    runner.startAll();
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      try {
+        printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
+        runner.stop();
+        meetupRSVPProvider.stopPublishing();
+        _kafkaStarter.stop();
+        ZkStarter.stopLocalZkServer(zookeeperInstance);
+        FileUtils.deleteDirectory(quickstartTmpDir);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }));
+    printStatus(Color.CYAN, "***** Bootstrap meetupRSVP(upsert) table *****");
+    runner.bootstrapTable();
+    printStatus(Color.CYAN, "***** Waiting for 15 seconds for a few events to 
get populated *****");
+    Thread.sleep(15000);
+
+    printStatus(Color.YELLOW, "***** Upsert quickstart setup complete *****");
+    printStatus(Color.YELLOW, "***** The expected behavior for total number of 
documents per PK should be 1 *****");
+    printStatus(Color.YELLOW,
+        "***** The expected behavior for total number of rsvp_counts per PK 
should >=1 since it's incremented and updated. *****");
+
+    // The expected behavior for total number of documents per PK should be 1.
+    // The expected behavior for total number of rsvp_counts per PK should >=1 
since it's incremented and updated.
+    String q1 =
+        "select event_id, count(*), sum(rsvp_count) from meetupRsvp group by 
event_id order by sum(rsvp_count) desc limit 10";
+    printStatus(Color.YELLOW, "Total number of documents, total number of 
rsvp_counts per event_id in the table");
+    printStatus(Color.CYAN, "Query : " + q1);
+    printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
+    printStatus(Color.GREEN, 
"***************************************************");
+
+    printStatus(Color.GREEN, "You can always go to http://localhost:9000 to 
play around in the query console");
+  }
+}
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
new file mode 100644
index 0000000..62eb14a
--- /dev/null
+++ 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
@@ -0,0 +1,53 @@
+{
+  "tableName": "meetupRsvp",
+  "tableType": "REALTIME",
+  "segmentsConfig": {
+    "timeColumnName": "mtime",
+    "timeType": "MILLISECONDS",
+    "retentionTimeUnit": "DAYS",
+    "retentionTimeValue": "1",
+    "segmentPushType": "APPEND",
+    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+    "schemaName": "meetupRsvp",
+    "replicasPerPartition": "1"
+  },
+  "tenants": {},
+  "tableIndexConfig": {
+    "loadMode": "MMAP",
+    "nullHandlingEnabled": true,
+    "streamConfigs": {
+      "streamType": "kafka",
+      "stream.kafka.consumer.type": "lowLevel",
+      "stream.kafka.topic.name": "meetupRSVPEvents",
+      "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+      "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
+      "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
+      "stream.kafka.zk.broker.url": "localhost:2191/kafka",
+      "stream.kafka.broker.list": "localhost:19092",
+      "realtime.segment.flush.threshold.size": 30,
+      "realtime.segment.flush.threshold.rows": 30
+    }
+  },
+  "fieldConfigList": [
+    {
+      "name": "location",
+      "encodingType":"RAW",
+      "indexType":"H3",
+      "properties": {
+        "resolutions": "5"
+      }
+    }
+  ],
+  "metadata": {
+    "customConfigs": {}
+  },
+  "routing": {
+    "instanceSelectorType": "strictReplicaGroup"
+  },
+  "upsertConfig": {
+    "mode": "PARTIAL",
+    "partialUpsertStrategies":{
+      "rsvp_count": "INCREMENT"
+    }
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to