This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b241d731b7 Track segments consistently for consistent upsert view 
(#13677)
b241d731b7 is described below

commit b241d731b7aec1f0db2bcef86394641bae927372
Author: Xiaobing <[email protected]>
AuthorDate: Fri Aug 16 13:58:25 2024 -0700

    Track segments consistently for consistent upsert view (#13677)
    
    * track segments consistently for consistent upsert view and
    
    * add util class UpsertViewManager
    
    * fix a sutble race condition for SNAPSHOT mode
---
 .../core/data/manager/DuoSegmentDataManager.java   | 118 ++++++++
 .../realtime/RealtimeSegmentDataManager.java       |   1 +
 .../manager/realtime/RealtimeTableDataManager.java |  60 +++-
 .../query/executor/ServerQueryExecutorV1Impl.java  | 121 ++++++--
 .../data/manager/DuoSegmentDataManagerTest.java    | 128 ++++++++
 .../local/data/manager/SegmentDataManager.java     |  12 +-
 .../immutable/ImmutableSegmentImpl.java            |   3 +
 .../indexsegment/mutable/MutableSegmentImpl.java   |  49 ++-
 .../local/realtime/impl/RealtimeSegmentConfig.java |  32 +-
 .../upsert/BasePartitionUpsertMetadataManager.java | 245 +++------------
 .../upsert/BaseTableUpsertMetadataManager.java     |   5 +
 ...oncurrentMapPartitionUpsertMetadataManager.java |  11 +-
 ...nUpsertMetadataManagerForConsistentDeletes.java |  11 +-
 .../ConcurrentMapTableUpsertMetadataManager.java   |  61 +++-
 .../upsert/PartitionUpsertMetadataManager.java     |  13 +
 .../local/upsert/TableUpsertMetadataManager.java   |  14 +
 .../pinot/segment/local/upsert/UpsertUtils.java    |  39 ++-
 .../segment/local/upsert/UpsertViewManager.java    | 327 +++++++++++++++++++++
 .../BasePartitionUpsertMetadataManagerTest.java    |  69 +++--
 .../local/upsert/UpsertViewManagerTest.java        |  59 ++++
 .../apache/pinot/spi/config/table/TableConfig.java |   5 +
 21 files changed, 1070 insertions(+), 313 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/DuoSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/DuoSegmentDataManager.java
new file mode 100644
index 0000000000..fb22e78655
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/DuoSegmentDataManager.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * Segment data manager tracking two segments associated with one segment 
name, e.g. when committing a mutable
+ * segment, a new immutable segment is created to replace the mutable one, and 
the two segments are having same name.
+ * By tracked both with this segment data manager, we can provide queries both 
segments for complete data view.
+ * The primary segment represents all segments tracked by this manager for 
places asking for segment metadata.
+ */
+public class DuoSegmentDataManager extends SegmentDataManager {
+  private final SegmentDataManager _primary;
+  private final List<SegmentDataManager> _segmentDataManagers;
+
+  public DuoSegmentDataManager(SegmentDataManager primary, SegmentDataManager 
secondary) {
+    _primary = primary;
+    _segmentDataManagers = Arrays.asList(_primary, secondary);
+  }
+
+  @Override
+  public long getLoadTimeMs() {
+    return _primary.getLoadTimeMs();
+  }
+
+  @Override
+  public synchronized int getReferenceCount() {
+    return _primary.getReferenceCount();
+  }
+
+  @Override
+  public String getSegmentName() {
+    return _primary.getSegmentName();
+  }
+
+  @Override
+  public IndexSegment getSegment() {
+    return _primary.getSegment();
+  }
+
+  @Override
+  public synchronized boolean increaseReferenceCount() {
+    boolean any = false;
+    for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
+      if (segmentDataManager.increaseReferenceCount()) {
+        any = true;
+      }
+    }
+    return any;
+  }
+
+  @Override
+  public synchronized boolean decreaseReferenceCount() {
+    boolean any = false;
+    for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
+      if (segmentDataManager.decreaseReferenceCount()) {
+        any = true;
+      }
+    }
+    return any;
+  }
+
+  @Override
+  public boolean hasMultiSegments() {
+    return true;
+  }
+
+  @Override
+  public List<IndexSegment> getSegments() {
+    List<IndexSegment> segments = new ArrayList<>(_segmentDataManagers.size());
+    for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
+      if (segmentDataManager.getReferenceCount() > 0) {
+        segments.add(segmentDataManager.getSegment());
+      }
+    }
+    return segments;
+  }
+
+  @Override
+  public void doOffload() {
+    for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
+      if (segmentDataManager.getReferenceCount() == 0) {
+        segmentDataManager.offload();
+      }
+    }
+  }
+
+  @Override
+  protected void doDestroy() {
+    for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
+      if (segmentDataManager.getReferenceCount() == 0) {
+        segmentDataManager.destroy();
+      }
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index b04bf4a36e..db9c63770a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1565,6 +1565,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
             
.setIngestionAggregationConfigs(IngestionConfigUtils.getAggregationConfigs(tableConfig))
             .setNullHandlingEnabled(_nullHandlingEnabled)
             
.setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode())
+            .setUpsertConsistencyMode(tableConfig.getUpsertConsistencyMode())
             .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
             
.setUpsertComparisonColumns(tableConfig.getUpsertComparisonColumns())
             
.setUpsertDeleteRecordColumn(tableConfig.getUpsertDeleteRecordColumn())
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 8a99610c75..aa4e061204 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -49,6 +49,7 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.SegmentUtils;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.data.manager.BaseTableDataManager;
+import org.apache.pinot.core.data.manager.DuoSegmentDataManager;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
@@ -98,7 +99,6 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   // The semaphores will stay in the hash map even if the consuming partitions 
move to a different host.
   // We expect that there will be a small number of semaphores, but that may 
be ok.
   private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new 
ConcurrentHashMap<>();
-
   // The old name of the stats file used to be stats.ser which we changed when 
we moved all packages
   // from com.linkedin to org.apache because of not being able to deserialize 
the old files using the newer classes
   private static final String STATS_FILE_NAME = "segment-stats.ser";
@@ -507,9 +507,9 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
         new RealtimeSegmentDataManager(zkMetadata, tableConfig, this, 
_indexDir.getAbsolutePath(), indexLoadingConfig,
             schema, llcSegmentName, semaphore, _serverMetrics, 
partitionUpsertMetadataManager,
             partitionDedupMetadataManager, _isTableReadyToConsumeData);
+    registerSegment(segmentName, realtimeSegmentDataManager, 
partitionUpsertMetadataManager);
     realtimeSegmentDataManager.startConsumption();
     _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.SEGMENT_COUNT, 1);
-    registerSegment(segmentName, realtimeSegmentDataManager);
 
     _logger.info("Added new CONSUMING segment: {}", segmentName);
   }
@@ -603,7 +603,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       // Preloading segment is ensured to be handled by a single thread, so no 
need to take the segment upsert lock.
       // Besides, preloading happens before the table partition is made ready 
for any queries.
       partitionUpsertMetadataManager.preloadSegment(immutableSegment);
-      registerSegment(segmentName, newSegmentManager);
+      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
       _logger.info("Preloaded immutable segment: {} with upsert enabled", 
segmentName);
       return;
     }
@@ -614,24 +614,52 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       // segments may be invalidated, making the queries see less valid docs 
than expected. We should let query
       // access the new segment asap even though its validDocId bitmap is 
still being filled by
       // partitionUpsertMetadataManager.
-      registerSegment(segmentName, newSegmentManager);
+      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
       partitionUpsertMetadataManager.addSegment(immutableSegment);
       _logger.info("Added new immutable segment: {} with upsert enabled", 
segmentName);
     } else {
-      // When replacing a segment, we should register the new segment 'after' 
it is fully initialized by
-      // partitionUpsertMetadataManager to fill up its validDocId bitmap. 
Otherwise, the queries will lose the access
-      // to the valid docs in the old segment immediately, but the validDocId 
bitmap of the new segment is still
-      // being filled by partitionUpsertMetadataManager, making the queries 
see less valid docs than expected.
-      // When replacing a segment, the new and old segments are assumed to 
have same set of valid docs for data
-      // consistency, otherwise the new segment should be named differently to 
go through the addSegment flow above.
-      IndexSegment oldSegment = oldSegmentManager.getSegment();
+      replaceUpsertSegment(segmentName, oldSegmentManager, newSegmentManager, 
partitionUpsertMetadataManager);
+    }
+  }
+
+  private void replaceUpsertSegment(String segmentName, SegmentDataManager 
oldSegmentManager,
+      ImmutableSegmentDataManager newSegmentManager, 
PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
+    // When replacing a segment, we should register the new segment 'after' it 
is fully initialized by
+    // partitionUpsertMetadataManager to fill up its validDocId bitmap. 
Otherwise, the queries will lose the access
+    // to the valid docs in the old segment immediately, but the validDocId 
bitmap of the new segment is still
+    // being filled by partitionUpsertMetadataManager, making the queries see 
less valid docs than expected.
+    IndexSegment oldSegment = oldSegmentManager.getSegment();
+    ImmutableSegment immutableSegment = newSegmentManager.getSegment();
+    UpsertConfig.ConsistencyMode consistencyMode = 
_tableUpsertMetadataManager.getUpsertConsistencyMode();
+    if (consistencyMode == UpsertConfig.ConsistencyMode.NONE) {
       partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
-      registerSegment(segmentName, newSegmentManager);
-      _logger.info("Replaced {} segment: {} with upsert enabled",
-          oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName);
-      oldSegmentManager.offload();
-      releaseSegment(oldSegmentManager);
+      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
+    } else {
+      // By default, when replacing a segment, the old segment is kept intact 
and visible to query until the new
+      // segment is registered as in the if-branch above. But the newly 
ingested records will invalidate valid
+      // docs in the new segment as the upsert metadata gets updated during 
replacement, so the query will miss the
+      // new updates in the new segment, until it's registered after the 
replacement is done.
+      // For consistent data view, we make both old and new segment visible to 
the query and update both in place
+      // when segment replacement and new data ingestion are happening in 
parallel.
+      SegmentDataManager duoSegmentDataManager = new 
DuoSegmentDataManager(newSegmentManager, oldSegmentManager);
+      registerSegment(segmentName, duoSegmentDataManager, 
partitionUpsertMetadataManager);
+      partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
+      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
+    }
+    _logger.info("Replaced {} segment: {} with upsert enabled and consistency 
mode: {}",
+        oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName, consistencyMode);
+    oldSegmentManager.offload();
+    releaseSegment(oldSegmentManager);
+  }
+
+  private void registerSegment(String segmentName, SegmentDataManager 
segmentDataManager,
+      @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) 
{
+    if (partitionUpsertMetadataManager != null) {
+      // Register segment to the upsert metadata manager before registering it 
to table manager, so that the upsert
+      // metadata manger can update the upsert view before the segment becomes 
visible to queries.
+      
partitionUpsertMetadataManager.trackSegmentForUpsertView(segmentDataManager.getSegment());
     }
+    registerSegment(segmentName, segmentDataManager);
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 8c2906db54..7253a8abfc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
@@ -67,12 +68,14 @@ import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.util.trace.TraceContext;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.SegmentContext;
 import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
 import org.apache.pinot.spi.exception.QueryCancelledException;
@@ -203,18 +206,63 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
     List<String> optionalSegments = queryRequest.getOptionalSegments();
     List<String> notAcquiredSegments = new ArrayList<>();
-    List<SegmentDataManager> segmentDataManagers =
-        tableDataManager.acquireSegments(segmentsToQuery, optionalSegments, 
notAcquiredSegments);
-    int numSegmentsAcquired = segmentDataManagers.size();
+    int numSegmentsAcquired;
+    List<SegmentDataManager> segmentDataManagers;
+    List<IndexSegment> indexSegments;
+    Map<IndexSegment, SegmentContext> providedSegmentContexts = null;
+    if (!isUpsertTableUsingConsistencyMode(tableDataManager)) {
+      segmentDataManagers = tableDataManager.acquireSegments(segmentsToQuery, 
optionalSegments, notAcquiredSegments);
+      numSegmentsAcquired = segmentDataManagers.size();
+      indexSegments = new ArrayList<>(numSegmentsAcquired);
+      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+        indexSegments.add(segmentDataManager.getSegment());
+      }
+    } else {
+      RealtimeTableDataManager rtdm = (RealtimeTableDataManager) 
tableDataManager;
+      TableUpsertMetadataManager tumm = rtdm.getTableUpsertMetadataManager();
+      tumm.lockForSegmentContexts();
+      try {
+        // Server can start consuming segment before broker can update its 
routing table upon IdealState changes, so
+        // broker can miss the new consuming segment even with the previous 
fix #11978. For a complete upsert data view,
+        // the consuming segment should be acquired all the time speculatively 
if it's not included by the broker.
+        Set<String> allSegmentsToQuery = new HashSet<>(segmentsToQuery);
+        if (optionalSegments == null) {
+          optionalSegments = new ArrayList<>();
+        } else {
+          allSegmentsToQuery.addAll(optionalSegments);
+        }
+        for (String segmentName : tumm.getOptionalSegments()) {
+          if (!allSegmentsToQuery.contains(segmentName)) {
+            optionalSegments.add(segmentName);
+          }
+        }
+        segmentDataManagers = 
tableDataManager.acquireSegments(segmentsToQuery, optionalSegments, 
notAcquiredSegments);
+        numSegmentsAcquired = segmentDataManagers.size();
+        indexSegments = new ArrayList<>(numSegmentsAcquired);
+        for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+          // When using consistency mode, a segment data manager may track two 
segments with same name, and they both
+          // contain part of the valid docs for the segment.
+          if (segmentDataManager.hasMultiSegments()) {
+            indexSegments.addAll(segmentDataManager.getSegments());
+          } else {
+            indexSegments.add(segmentDataManager.getSegment());
+          }
+        }
+        List<SegmentContext> segmentContexts =
+            tableDataManager.getSegmentContexts(indexSegments, 
queryContext.getQueryOptions());
+        providedSegmentContexts = new HashMap<>(segmentContexts.size());
+        for (SegmentContext sc : segmentContexts) {
+          providedSegmentContexts.put(sc.getIndexSegment(), sc);
+        }
+      } finally {
+        tumm.unlockForSegmentContexts();
+      }
+    }
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Processing requestId: {} with segmentsToQuery: {}, 
optionalSegments: {} and acquiredSegments: {}",
           requestId, segmentsToQuery, optionalSegments,
           
segmentDataManagers.stream().map(SegmentDataManager::getSegmentName).collect(Collectors.toList()));
     }
-    List<IndexSegment> indexSegments = new ArrayList<>(numSegmentsAcquired);
-    for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-      indexSegments.add(segmentDataManager.getSegment());
-    }
 
     // Gather stats for realtime consuming segments
     // TODO: the freshness time should not be collected at query time because 
there is no guarantee that the consuming
@@ -259,8 +307,8 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     InstanceResponseBlock instanceResponse = null;
     try {
       instanceResponse =
-          executeInternal(tableDataManager, indexSegments, queryContext, 
timerContext, executorService, streamer,
-              queryRequest.isEnableStreaming());
+          executeInternal(tableDataManager, indexSegments, 
providedSegmentContexts, queryContext, timerContext,
+              executorService, streamer, queryRequest.isEnableStreaming());
     } catch (Exception e) {
       _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
       instanceResponse = new InstanceResponseBlock();
@@ -346,12 +394,27 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     return instanceResponse;
   }
 
+  private boolean isUpsertTableUsingConsistencyMode(TableDataManager 
tableDataManager) {
+    // For upsert table using consistency mode, we have to acquire segments 
and get their validDocIds atomically.
+    // Otherwise, the query may not have access to new segments added just 
before it gets validDocIds for the acquired
+    // segments, thus missing valid docs replaced by the newly added segments.
+    if (tableDataManager instanceof RealtimeTableDataManager) {
+      RealtimeTableDataManager rtdm = (RealtimeTableDataManager) 
tableDataManager;
+      if (rtdm.isUpsertEnabled()) {
+        return rtdm.getTableUpsertMetadataManager().getUpsertConsistencyMode() 
!= UpsertConfig.ConsistencyMode.NONE;
+      }
+    }
+    return false;
+  }
+
   // NOTE: This method might change indexSegments. Do not use it after calling 
this method.
   private InstanceResponseBlock executeInternal(TableDataManager 
tableDataManager, List<IndexSegment> indexSegments,
-      QueryContext queryContext, TimerContext timerContext, ExecutorService 
executorService,
-      @Nullable ResultsBlockStreamer streamer, boolean enableStreaming)
+      @Nullable Map<IndexSegment, SegmentContext> providedSegmentContexts, 
QueryContext queryContext,
+      TimerContext timerContext, ExecutorService executorService, @Nullable 
ResultsBlockStreamer streamer,
+      boolean enableStreaming)
       throws Exception {
-    handleSubquery(queryContext, tableDataManager, indexSegments, 
timerContext, executorService);
+    handleSubquery(queryContext, tableDataManager, indexSegments, 
providedSegmentContexts, timerContext,
+        executorService);
 
     // Compute total docs for the table before pruning the segments
     long numTotalDocs = 0;
@@ -382,8 +445,13 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       }
     } else {
       TimerContext.Timer planBuildTimer = 
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
-      List<SegmentContext> selectedSegmentContexts =
-          tableDataManager.getSegmentContexts(selectedSegments, 
queryContext.getQueryOptions());
+      List<SegmentContext> selectedSegmentContexts;
+      if (providedSegmentContexts == null) {
+        selectedSegmentContexts = 
tableDataManager.getSegmentContexts(selectedSegments, 
queryContext.getQueryOptions());
+      } else {
+        selectedSegmentContexts = new ArrayList<>(selectedSegments.size());
+        selectedSegments.forEach(s -> 
selectedSegmentContexts.add(providedSegmentContexts.get(s)));
+      }
       Plan queryPlan =
           enableStreaming ? 
_planMaker.makeStreamingInstancePlan(selectedSegmentContexts, queryContext, 
executorService,
               streamer, _serverMetrics)
@@ -529,11 +597,12 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
    * <p>Currently only supports subquery within the filter.
    */
   private void handleSubquery(QueryContext queryContext, TableDataManager 
tableDataManager,
-      List<IndexSegment> indexSegments, TimerContext timerContext, 
ExecutorService executorService)
+      List<IndexSegment> indexSegments, @Nullable Map<IndexSegment, 
SegmentContext> providedSegmentContexts,
+      TimerContext timerContext, ExecutorService executorService)
       throws Exception {
     FilterContext filter = queryContext.getFilter();
     if (filter != null && !filter.isConstant()) {
-      handleSubquery(filter, tableDataManager, indexSegments, timerContext, 
executorService,
+      handleSubquery(filter, tableDataManager, indexSegments, 
providedSegmentContexts, timerContext, executorService,
           queryContext.getEndTimeMs());
     }
   }
@@ -543,16 +612,18 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
    * <p>Currently only supports subquery within the lhs of the predicate.
    */
   private void handleSubquery(FilterContext filter, TableDataManager 
tableDataManager, List<IndexSegment> indexSegments,
-      TimerContext timerContext, ExecutorService executorService, long 
endTimeMs)
+      @Nullable Map<IndexSegment, SegmentContext> providedSegmentContexts, 
TimerContext timerContext,
+      ExecutorService executorService, long endTimeMs)
       throws Exception {
     List<FilterContext> children = filter.getChildren();
     if (children != null) {
       for (FilterContext child : children) {
-        handleSubquery(child, tableDataManager, indexSegments, timerContext, 
executorService, endTimeMs);
+        handleSubquery(child, tableDataManager, indexSegments, 
providedSegmentContexts, timerContext, executorService,
+            endTimeMs);
       }
     } else {
-      handleSubquery(filter.getPredicate().getLhs(), tableDataManager, 
indexSegments, timerContext, executorService,
-          endTimeMs);
+      handleSubquery(filter.getPredicate().getLhs(), tableDataManager, 
indexSegments, providedSegmentContexts,
+          timerContext, executorService, endTimeMs);
     }
   }
 
@@ -564,7 +635,8 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
    * rewritten to an IN_ID_SET transform function.
    */
   private void handleSubquery(ExpressionContext expression, TableDataManager 
tableDataManager,
-      List<IndexSegment> indexSegments, TimerContext timerContext, 
ExecutorService executorService, long endTimeMs)
+      List<IndexSegment> indexSegments, @Nullable Map<IndexSegment, 
SegmentContext> providedSegmentContexts,
+      TimerContext timerContext, ExecutorService executorService, long 
endTimeMs)
       throws Exception {
     FunctionContext function = expression.getFunction();
     if (function == null) {
@@ -591,8 +663,8 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       subquery.setEndTimeMs(endTimeMs);
       // Make a clone of indexSegments because the method might modify the list
       InstanceResponseBlock instanceResponse =
-          executeInternal(tableDataManager, new ArrayList<>(indexSegments), 
subquery, timerContext, executorService,
-              null, false);
+          executeInternal(tableDataManager, new ArrayList<>(indexSegments), 
providedSegmentContexts, subquery,
+              timerContext, executorService, null, false);
       BaseResultsBlock resultsBlock = instanceResponse.getResultsBlock();
       Preconditions.checkState(resultsBlock instanceof AggregationResultsBlock,
           "Got unexpected results block type: %s, expecting aggregation 
results",
@@ -605,7 +677,8 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       arguments.set(1, 
ExpressionContext.forLiteral(RequestUtils.getLiteral(((IdSet) 
result).toBase64String())));
     } else {
       for (ExpressionContext argument : arguments) {
-        handleSubquery(argument, tableDataManager, indexSegments, 
timerContext, executorService, endTimeMs);
+        handleSubquery(argument, tableDataManager, indexSegments, 
providedSegmentContexts, timerContext,
+            executorService, endTimeMs);
       }
     }
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/DuoSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/DuoSegmentDataManagerTest.java
new file mode 100644
index 0000000000..acc030cfe6
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/DuoSegmentDataManagerTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager;
+
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+
+
+public class DuoSegmentDataManagerTest {
+  @Test
+  public void testGetSegments() {
+    SegmentDataManager sdm1 = mockSegmentDataManager("seg01", false, 1);
+    SegmentDataManager sdm2 = mockSegmentDataManager("seg01", true, 1);
+    DuoSegmentDataManager dsdm = new DuoSegmentDataManager(sdm1, sdm2);
+
+    assertTrue(dsdm.hasMultiSegments());
+    assertSame(dsdm.getSegment(), sdm1.getSegment());
+    assertEquals(dsdm.getSegments(), Arrays.asList(sdm1.getSegment(), 
sdm2.getSegment()));
+
+    when(sdm1.getReferenceCount()).thenReturn(0);
+    assertTrue(dsdm.hasMultiSegments());
+    assertSame(dsdm.getSegment(), sdm1.getSegment());
+    assertEquals(dsdm.getSegments(), 
Collections.singletonList(sdm2.getSegment()));
+
+    when(sdm2.getReferenceCount()).thenReturn(0);
+    assertTrue(dsdm.hasMultiSegments());
+    assertSame(dsdm.getSegment(), sdm1.getSegment());
+    assertTrue(dsdm.getSegments().isEmpty());
+  }
+
+  @Test
+  public void testIncDecRefCnt() {
+    SegmentDataManager sdm1 = mockSegmentDataManager("seg01", false, 1);
+    SegmentDataManager sdm2 = mockSegmentDataManager("seg01", true, 1);
+    DuoSegmentDataManager dsdm = new DuoSegmentDataManager(sdm1, sdm2);
+
+    when(sdm1.increaseReferenceCount()).thenReturn(false);
+    when(sdm2.increaseReferenceCount()).thenReturn(false);
+    when(sdm1.decreaseReferenceCount()).thenReturn(false);
+    when(sdm2.decreaseReferenceCount()).thenReturn(false);
+    assertFalse(dsdm.increaseReferenceCount());
+    assertFalse(dsdm.decreaseReferenceCount());
+
+    when(sdm1.increaseReferenceCount()).thenReturn(true);
+    when(sdm2.increaseReferenceCount()).thenReturn(false);
+    when(sdm1.decreaseReferenceCount()).thenReturn(true);
+    when(sdm2.decreaseReferenceCount()).thenReturn(false);
+    assertTrue(dsdm.increaseReferenceCount());
+    assertTrue(dsdm.decreaseReferenceCount());
+
+    when(sdm1.increaseReferenceCount()).thenReturn(true);
+    when(sdm2.increaseReferenceCount()).thenReturn(true);
+    when(sdm1.decreaseReferenceCount()).thenReturn(true);
+    when(sdm2.decreaseReferenceCount()).thenReturn(true);
+    assertTrue(dsdm.increaseReferenceCount());
+    assertTrue(dsdm.decreaseReferenceCount());
+  }
+
+  @Test
+  public void testDoOffloadDestroy() {
+    SegmentDataManager sdm1 = mockSegmentDataManager("seg01", false, 1);
+    SegmentDataManager sdm2 = mockSegmentDataManager("seg01", true, 1);
+    DuoSegmentDataManager dsdm = new DuoSegmentDataManager(sdm1, sdm2);
+
+    dsdm.doOffload();
+    verify(sdm1, times(0)).offload();
+    verify(sdm2, times(0)).offload();
+    dsdm.doDestroy();
+    verify(sdm1, times(0)).destroy();
+    verify(sdm2, times(0)).destroy();
+
+    when(sdm1.getReferenceCount()).thenReturn(0);
+    dsdm.doOffload();
+    verify(sdm1, times(1)).offload();
+    verify(sdm2, times(0)).offload();
+    dsdm.doDestroy();
+    verify(sdm1, times(1)).destroy();
+    verify(sdm2, times(0)).destroy();
+
+    reset(sdm1);
+    when(sdm2.getReferenceCount()).thenReturn(0);
+    dsdm.doOffload();
+    verify(sdm1, times(1)).offload();
+    verify(sdm2, times(1)).offload();
+    dsdm.doDestroy();
+    verify(sdm1, times(1)).destroy();
+    verify(sdm2, times(1)).destroy();
+  }
+
+  private SegmentDataManager mockSegmentDataManager(String segmentName, 
boolean isMutable, int refCnt) {
+    SegmentDataManager segmentDataManager =
+        isMutable ? mock(RealtimeSegmentDataManager.class) : 
mock(ImmutableSegmentDataManager.class);
+    IndexSegment segment = isMutable ? mock(MutableSegment.class) : 
mock(ImmutableSegment.class);
+    when(segmentDataManager.getSegmentName()).thenReturn(segmentName);
+    when(segmentDataManager.getSegment()).thenReturn(segment);
+    when(segmentDataManager.getReferenceCount()).thenReturn(refCnt);
+    return segmentDataManager;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
index 77b01a97e8..2edd3c134a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
@@ -18,7 +18,8 @@
  */
 package org.apache.pinot.segment.local.data.manager;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.pinot.segment.spi.IndexSegment;
 
@@ -36,7 +37,6 @@ public abstract class SegmentDataManager {
     return _loadTimeMs;
   }
 
-  @VisibleForTesting
   public synchronized int getReferenceCount() {
     return _referenceCount;
   }
@@ -74,6 +74,14 @@ public abstract class SegmentDataManager {
 
   public abstract IndexSegment getSegment();
 
+  public boolean hasMultiSegments() {
+    return false;
+  }
+
+  public List<IndexSegment> getSegments() {
+    return Collections.emptyList();
+  }
+
   /**
    * Offloads the segment from the metadata management (e.g. upsert metadata), 
but not releases the resources yet
    * because there might be queries still accessing the segment.
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index ff49a36c8d..76129dabb8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -265,6 +265,9 @@ public class ImmutableSegmentImpl implements 
ImmutableSegment {
   public void destroy() {
     String segmentName = getSegmentName();
     LOGGER.info("Trying to destroy segment : {}", segmentName);
+    if (_partitionUpsertMetadataManager != null) {
+      _partitionUpsertMetadataManager.untrackSegmentForUpsertView(this);
+    }
     // StarTreeIndexContainer refers to other column index containers, so 
close it firstly.
     if (_starTreeIndexContainer != null) {
       try {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 730d7a02ea..908f220471 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -91,6 +91,7 @@ import 
org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -162,6 +163,7 @@ public class MutableSegmentImpl implements MutableSegment {
   private final String _dedupTimeColumn;
 
   private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+  private final UpsertConfig.ConsistencyMode _upsertConsistencyMode;
   private final List<String> _upsertComparisonColumns;
   private final String _deleteRecordColumn;
   private final String _upsertOutOfOrderRecordColumn;
@@ -363,6 +365,7 @@ public class MutableSegmentImpl implements MutableSegment {
     }
 
     _partitionUpsertMetadataManager = 
config.getPartitionUpsertMetadataManager();
+    _upsertConsistencyMode = config.getUpsertConsistencyMode();
     if (_partitionUpsertMetadataManager != null) {
       Preconditions.checkState(!isAggregateMetricsEnabled(),
           "Metrics aggregation and upsert cannot be enabled together");
@@ -488,23 +491,37 @@ public class MutableSegmentImpl implements MutableSegment 
{
     if (isUpsertEnabled()) {
       RecordInfo recordInfo = getRecordInfo(row, numDocsIndexed);
       GenericRow updatedRow = 
_partitionUpsertMetadataManager.updateRecord(row, recordInfo);
-      // if record doesn't need to be dropped, then persist in segment and 
update metadata hashmap
-      // we are doing metadata update first followed by segment data update 
here, there can be a scenario where
-      // segment indexing or addNewRow call errors out in those scenario, 
there can be metadata inconsistency where
-      // a key is pointing to some other key's docID
-      // TODO fix this metadata mismatch scenario
-      boolean isOutOfOrderRecord = 
!_partitionUpsertMetadataManager.addRecord(this, recordInfo);
-      if (_upsertOutOfOrderRecordColumn != null) {
-        updatedRow.putValue(_upsertOutOfOrderRecordColumn, 
BooleanUtils.toInt(isOutOfOrderRecord));
-      }
-      if (!isOutOfOrderRecord || !_upsertDropOutOfOrderRecord) {
+      if (_upsertConsistencyMode != UpsertConfig.ConsistencyMode.NONE) {
         updateDictionary(updatedRow);
         addNewRow(numDocsIndexed, updatedRow);
-        // Update number of documents indexed before handling the upsert 
metadata so that the record becomes queryable
-        // once validated
         numDocsIndexed++;
+        canTakeMore = numDocsIndexed < _capacity;
+        _numDocsIndexed = numDocsIndexed;
+        // Index the record and update _numDocsIndexed counter before updating 
the upsert metadata so that the record
+        // becomes queryable before validDocIds bitmaps are updated. This 
order is important for consistent upsert view,
+        // otherwise the latest doc can be missed by query due to 'docId < 
_numDocs' check in query filter operators.
+        // NOTE: out-of-order records can not be dropped or marked when 
consistent upsert view is enabled.
+        _partitionUpsertMetadataManager.addRecord(this, recordInfo);
+      } else {
+        // if record doesn't need to be dropped, then persist in segment and 
update metadata hashmap
+        // we are doing metadata update first followed by segment data update 
here, there can be a scenario where
+        // segment indexing or addNewRow call errors out in those scenario, 
there can be metadata inconsistency where
+        // a key is pointing to some other key's docID
+        // TODO fix this metadata mismatch scenario
+        boolean isOutOfOrderRecord = 
!_partitionUpsertMetadataManager.addRecord(this, recordInfo);
+        if (_upsertOutOfOrderRecordColumn != null) {
+          updatedRow.putValue(_upsertOutOfOrderRecordColumn, 
BooleanUtils.toInt(isOutOfOrderRecord));
+        }
+        if (!isOutOfOrderRecord || !_upsertDropOutOfOrderRecord) {
+          updateDictionary(updatedRow);
+          addNewRow(numDocsIndexed, updatedRow);
+          // Update number of documents indexed before handling the upsert 
metadata so that the record becomes queryable
+          // once validated
+          numDocsIndexed++;
+        }
+        canTakeMore = numDocsIndexed < _capacity;
+        _numDocsIndexed = numDocsIndexed;
       }
-      canTakeMore = numDocsIndexed < _capacity;
     } else {
       // Update dictionary first
       updateDictionary(row);
@@ -523,8 +540,8 @@ public class MutableSegmentImpl implements MutableSegment {
         aggregateMetrics(row, docId);
         canTakeMore = true;
       }
+      _numDocsIndexed = numDocsIndexed;
     }
-    _numDocsIndexed = numDocsIndexed;
 
     // Update last indexed time and latest ingestion time
     _lastIndexedTimeMs = System.currentTimeMillis();
@@ -960,7 +977,9 @@ public class MutableSegmentImpl implements MutableSegment {
   @Override
   public void destroy() {
     _logger.info("Trying to close RealtimeSegmentImpl : {}", _segmentName);
-
+    if (_partitionUpsertMetadataManager != null) {
+      _partitionUpsertMetadataManager.untrackSegmentForUpsertView(this);
+    }
     // Gather statistics for off-heap mode
     if (_offHeap) {
       if (_numDocsIndexed > 0) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 2d5e7d10d2..6fb0c203f5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -60,6 +60,7 @@ public class RealtimeSegmentConfig {
   private final boolean _aggregateMetrics;
   private final boolean _nullHandlingEnabled;
   private final UpsertConfig.Mode _upsertMode;
+  private final UpsertConfig.ConsistencyMode _upsertConsistencyMode;
   private final List<String> _upsertComparisonColumns;
   private final String _upsertDeleteRecordColumn;
   private final String _upsertOutOfOrderRecordColumn;
@@ -74,14 +75,14 @@ public class RealtimeSegmentConfig {
   // TODO: Clean up this constructor. Most of these things can be extracted 
from tableConfig.
   private RealtimeSegmentConfig(String tableNameWithType, String segmentName, 
String streamName, Schema schema,
       String timeColumnName, int capacity, int avgNumMultiValues, Map<String, 
FieldIndexConfigs> indexConfigByCol,
-      SegmentZKMetadata segmentZKMetadata, boolean offHeap,
-      PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory 
statsHistory, String partitionColumn,
-      PartitionFunction partitionFunction, int partitionId, boolean 
aggregateMetrics, boolean nullHandlingEnabled,
-      String consumerDir, UpsertConfig.Mode upsertMode, List<String> 
upsertComparisonColumns,
-      String upsertDeleteRecordColumn, String upsertOutOfOrderRecordColumn, 
boolean upsertDropOutOfOrderRecord,
-      PartitionUpsertMetadataManager partitionUpsertMetadataManager, String 
dedupTimeColumn,
-      PartitionDedupMetadataManager partitionDedupMetadataManager, 
List<FieldConfig> fieldConfigList,
-      List<AggregationConfig> ingestionAggregationConfigs) {
+      SegmentZKMetadata segmentZKMetadata, boolean offHeap, 
PinotDataBufferMemoryManager memoryManager,
+      RealtimeSegmentStatsHistory statsHistory, String partitionColumn, 
PartitionFunction partitionFunction,
+      int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled, 
String consumerDir,
+      UpsertConfig.Mode upsertMode, UpsertConfig.ConsistencyMode 
upsertConsistencyMode,
+      List<String> upsertComparisonColumns, String upsertDeleteRecordColumn, 
String upsertOutOfOrderRecordColumn,
+      boolean upsertDropOutOfOrderRecord, PartitionUpsertMetadataManager 
partitionUpsertMetadataManager,
+      String dedupTimeColumn, PartitionDedupMetadataManager 
partitionDedupMetadataManager,
+      List<FieldConfig> fieldConfigList, List<AggregationConfig> 
ingestionAggregationConfigs) {
     _tableNameWithType = tableNameWithType;
     _segmentName = segmentName;
     _streamName = streamName;
@@ -101,6 +102,7 @@ public class RealtimeSegmentConfig {
     _nullHandlingEnabled = nullHandlingEnabled;
     _consumerDir = consumerDir;
     _upsertMode = upsertMode != null ? upsertMode : UpsertConfig.Mode.NONE;
+    _upsertConsistencyMode = upsertConsistencyMode != null ? 
upsertConsistencyMode : UpsertConfig.ConsistencyMode.NONE;
     _upsertComparisonColumns = upsertComparisonColumns;
     _upsertDeleteRecordColumn = upsertDeleteRecordColumn;
     _upsertOutOfOrderRecordColumn = upsertOutOfOrderRecordColumn;
@@ -188,6 +190,10 @@ public class RealtimeSegmentConfig {
     return _upsertMode;
   }
 
+  public UpsertConfig.ConsistencyMode getUpsertConsistencyMode() {
+    return _upsertConsistencyMode;
+  }
+
   public boolean isDedupEnabled() {
     return _partitionDedupMetadataManager != null;
   }
@@ -248,6 +254,7 @@ public class RealtimeSegmentConfig {
     private boolean _nullHandlingEnabled = false;
     private String _consumerDir;
     private UpsertConfig.Mode _upsertMode;
+    private UpsertConfig.ConsistencyMode _upsertConsistencyMode;
     private List<String> _upsertComparisonColumns;
     private String _upsertDeleteRecordColumn;
     private String _upsertOutOfOrderRecordColumn;
@@ -383,6 +390,11 @@ public class RealtimeSegmentConfig {
       return this;
     }
 
+    public Builder setUpsertConsistencyMode(UpsertConfig.ConsistencyMode 
upsertConsistencyMode) {
+      _upsertConsistencyMode = upsertConsistencyMode;
+      return this;
+    }
+
     public Builder setUpsertComparisonColumns(List<String> 
upsertComparisonColumns) {
       _upsertComparisonColumns = upsertComparisonColumns;
       return this;
@@ -437,8 +449,8 @@ public class RealtimeSegmentConfig {
       return new RealtimeSegmentConfig(_tableNameWithType, _segmentName, 
_streamName, _schema, _timeColumnName,
           _capacity, _avgNumMultiValues, 
Collections.unmodifiableMap(indexConfigByCol), _segmentZKMetadata, _offHeap,
           _memoryManager, _statsHistory, _partitionColumn, _partitionFunction, 
_partitionId, _aggregateMetrics,
-          _nullHandlingEnabled, _consumerDir, _upsertMode, 
_upsertComparisonColumns, _upsertDeleteRecordColumn,
-          _upsertOutOfOrderRecordColumn, _upsertDropOutOfOrderRecord,
+          _nullHandlingEnabled, _consumerDir, _upsertMode, 
_upsertConsistencyMode, _upsertComparisonColumns,
+          _upsertDeleteRecordColumn, _upsertOutOfOrderRecordColumn, 
_upsertDropOutOfOrderRecord,
           _partitionUpsertMetadataManager, _dedupTimeColumn, 
_partitionDedupMetadataManager, _fieldConfigList,
           _ingestionAggregationConfigs);
     }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 6951fc8197..880a7ace86 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -33,7 +33,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -58,7 +57,6 @@ import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.SegmentUtils;
 import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
-import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
@@ -70,7 +68,6 @@ import org.apache.pinot.segment.local.utils.HashUtils;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
-import org.apache.pinot.segment.spi.SegmentContext;
 import org.apache.pinot.segment.spi.V1Constants;
 import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
@@ -138,19 +135,8 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   private final Lock _preloadLock = new ReentrantLock();
   private volatile boolean _isPreloading;
 
-  // There are two consistency modes:
-  // If using SYNC mode, the upsert threads take the WLock when the upsert 
involves two segments' bitmaps; and
-  // the query threads take the RLock when getting bitmaps for all its 
selected segments.
-  // If using SNAPSHOT mode, the query threads don't need to take lock when 
getting bitmaps for all its selected
-  // segments, as the query threads access a copy of bitmaps that are kept 
updated by upsert thread periodically. But
-  // the query thread can specify a freshness threshold query option to 
refresh the bitmap copies if not fresh enough.
-  // By default, the mode is NONE to disable the support for data consistency.
-  private final UpsertConfig.ConsistencyMode _consistencyMode;
-  private final long _upsertViewRefreshIntervalMs;
-  private final ReadWriteLock _upsertViewLock = new ReentrantReadWriteLock();
-  private final Set<IndexSegment> _updatedSegmentsSinceLastRefresh = 
ConcurrentHashMap.newKeySet();
-  private volatile long _lastUpsertViewRefreshTimeMs = 0;
-  private volatile Map<IndexSegment, MutableRoaringBitmap> 
_segmentQueryableDocIdsMap;
+  // By default, the upsert consistency mode is NONE and upsertViewManager is 
disabled.
+  private final UpsertViewManager _upsertViewManager;
 
   protected BasePartitionUpsertMetadataManager(String tableNameWithType, int 
partitionId, UpsertContext context) {
     _tableNameWithType = tableNameWithType;
@@ -168,8 +154,11 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     _deletedKeysTTL = context.getDeletedKeysTTL();
     _tableIndexDir = context.getTableIndexDir();
     UpsertConfig.ConsistencyMode cmode = context.getConsistencyMode();
-    _consistencyMode = cmode != null ? cmode : 
UpsertConfig.ConsistencyMode.NONE;
-    _upsertViewRefreshIntervalMs = context.getUpsertViewRefreshIntervalMs();
+    if (cmode == UpsertConfig.ConsistencyMode.SYNC || cmode == 
UpsertConfig.ConsistencyMode.SNAPSHOT) {
+      _upsertViewManager = new UpsertViewManager(cmode, context);
+    } else {
+      _upsertViewManager = null;
+    }
     _serverMetrics = ServerMetrics.get();
     _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + 
"-" + getClass().getSimpleName());
     if (_metadataTTL > 0) {
@@ -706,8 +695,12 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     //       (1) skip loading segments without any invalid docs.
     //       (2) assign the invalid docs from the replaced segment to the new 
segment.
     String segmentName = segment.getSegmentName();
-    MutableRoaringBitmap validDocIdsForOldSegment =
-        oldSegment.getValidDocIds() != null ? 
oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+    MutableRoaringBitmap validDocIdsForOldSegment = null;
+    if (_upsertViewManager == null) {
+      // When not using consistency mode, we use a copy of the validDocIds 
bitmap of the old segment to keep the old
+      // segment intact during segment replacement and queries access the old 
segment during segment replacement.
+      validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
+    }
     if (recordInfoIterator != null) {
       Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
           "Got unsupported segment implementation: {} for segment: {}, table: 
{}", segment.getClass(), segmentName,
@@ -721,7 +714,11 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, 
queryableDocIds, recordInfoIterator, oldSegment,
           validDocIdsForOldSegment);
     }
-
+    if (_upsertViewManager != null) {
+      // When using consistency mode, the old segment's bitmap is updated in 
place, so we get the validDocIds after
+      // segment replacement is done.
+      validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
+    }
     if (validDocIdsForOldSegment != null && 
!validDocIdsForOldSegment.isEmpty()) {
       int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
       if (_partialUpsertHandler != null) {
@@ -740,6 +737,10 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
   }
 
+  private MutableRoaringBitmap getValidDocIdsForOldSegment(IndexSegment 
oldSegment) {
+    return oldSegment.getValidDocIds() != null ? 
oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+  }
+
   protected void removeSegment(IndexSegment segment, MutableRoaringBitmap 
validDocIds) {
     try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment, 
_primaryKeyColumns)) {
       removeSegment(segment, 
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
@@ -1132,31 +1133,12 @@ public abstract class 
BasePartitionUpsertMetadataManager implements PartitionUps
   protected void replaceDocId(IndexSegment newSegment, 
ThreadSafeMutableRoaringBitmap validDocIds,
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment 
oldSegment, int oldDocId, int newDocId,
       RecordInfo recordInfo) {
-    if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
-      _upsertViewLock.writeLock().lock();
-      try {
-        doRemoveDocId(oldSegment, oldDocId);
-        doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
-      } finally {
-        _upsertViewLock.writeLock().unlock();
-      }
-    } else if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
-      _upsertViewLock.readLock().lock();
-      try {
-        doRemoveDocId(oldSegment, oldDocId);
-        doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
-        _updatedSegmentsSinceLastRefresh.add(newSegment);
-        _updatedSegmentsSinceLastRefresh.add(oldSegment);
-      } finally {
-        _upsertViewLock.readLock().unlock();
-        // Batch refresh takes WLock. Do it outside RLock for clarity. The R/W 
lock ensures that only one thread
-        // can refresh the bitmaps. The other threads that are about to update 
the bitmaps will be blocked until
-        // refreshing is done.
-        doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
-      }
+    if (_upsertViewManager == null) {
+      UpsertUtils.doRemoveDocId(oldSegment, oldDocId);
+      UpsertUtils.doAddDocId(validDocIds, queryableDocIds, newDocId, 
recordInfo);
     } else {
-      doRemoveDocId(oldSegment, oldDocId);
-      doAddDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
+      _upsertViewManager.replaceDocId(newSegment, validDocIds, 
queryableDocIds, oldSegment, oldDocId, newDocId,
+          recordInfo);
     }
     trackUpdatedSegmentsSinceLastSnapshot(oldSegment);
   }
@@ -1169,71 +1151,27 @@ public abstract class 
BasePartitionUpsertMetadataManager implements PartitionUps
    */
   protected void replaceDocId(IndexSegment segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, 
int newDocId, RecordInfo recordInfo) {
-    if (_consistencyMode != UpsertConfig.ConsistencyMode.SNAPSHOT) {
-      doReplaceDocId(validDocIds, queryableDocIds, oldDocId, newDocId, 
recordInfo);
+    if (_upsertViewManager == null) {
+      UpsertUtils.doReplaceDocId(validDocIds, queryableDocIds, oldDocId, 
newDocId, recordInfo);
     } else {
-      _upsertViewLock.readLock().lock();
-      try {
-        doReplaceDocId(validDocIds, queryableDocIds, oldDocId, newDocId, 
recordInfo);
-        _updatedSegmentsSinceLastRefresh.add(segment);
-      } finally {
-        _upsertViewLock.readLock().unlock();
-        // Batch refresh takes WLock. Do it outside RLock for clarity.
-        doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
-      }
-    }
-  }
-
-  private void doReplaceDocId(ThreadSafeMutableRoaringBitmap validDocIds,
-      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, 
int newDocId, RecordInfo recordInfo) {
-    validDocIds.replace(oldDocId, newDocId);
-    if (queryableDocIds != null) {
-      if (recordInfo.isDeleteRecord()) {
-        queryableDocIds.remove(oldDocId);
-      } else {
-        queryableDocIds.replace(oldDocId, newDocId);
-      }
+      _upsertViewManager.replaceDocId(segment, validDocIds, queryableDocIds, 
oldDocId, newDocId, recordInfo);
     }
   }
 
   protected void addDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap 
validDocIds,
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, 
RecordInfo recordInfo) {
-    if (_consistencyMode != UpsertConfig.ConsistencyMode.SNAPSHOT) {
-      doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
+    if (_upsertViewManager == null) {
+      UpsertUtils.doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
     } else {
-      _upsertViewLock.readLock().lock();
-      try {
-        doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
-        _updatedSegmentsSinceLastRefresh.add(segment);
-      } finally {
-        _upsertViewLock.readLock().unlock();
-        // Batch refresh takes WLock. Do it outside RLock for clarity.
-        doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
-      }
-    }
-  }
-
-  private void doAddDocId(ThreadSafeMutableRoaringBitmap validDocIds,
-      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, 
RecordInfo recordInfo) {
-    validDocIds.add(docId);
-    if (queryableDocIds != null && !recordInfo.isDeleteRecord()) {
-      queryableDocIds.add(docId);
+      _upsertViewManager.addDocId(segment, validDocIds, queryableDocIds, 
docId, recordInfo);
     }
   }
 
   protected void removeDocId(IndexSegment segment, int docId) {
-    if (_consistencyMode != UpsertConfig.ConsistencyMode.SNAPSHOT) {
-      doRemoveDocId(segment, docId);
+    if (_upsertViewManager == null) {
+      UpsertUtils.doRemoveDocId(segment, docId);
     } else {
-      _upsertViewLock.readLock().lock();
-      try {
-        doRemoveDocId(segment, docId);
-        _updatedSegmentsSinceLastRefresh.add(segment);
-      } finally {
-        _upsertViewLock.readLock().unlock();
-        // Batch refresh takes WLock. Do it outside RLock for clarity.
-        doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
-      }
+      _upsertViewManager.removeDocId(segment, docId);
     }
     trackUpdatedSegmentsSinceLastSnapshot(segment);
   }
@@ -1249,112 +1187,25 @@ public abstract class 
BasePartitionUpsertMetadataManager implements PartitionUps
     }
   }
 
-  private void doRemoveDocId(IndexSegment segment, int docId) {
-    Objects.requireNonNull(segment.getValidDocIds()).remove(docId);
-    ThreadSafeMutableRoaringBitmap currentQueryableDocIds = 
segment.getQueryableDocIds();
-    if (currentQueryableDocIds != null) {
-      currentQueryableDocIds.remove(docId);
-    }
-  }
-
-  /**
-   * Use the segmentContexts to collect the contexts for selected segments. 
Reuse the segmentContext object if
-   * present, to avoid overwriting the contexts specified at the others places.
-   */
-  public void setSegmentContexts(List<SegmentContext> segmentContexts, 
Map<String, String> queryOptions) {
-    if (_consistencyMode == UpsertConfig.ConsistencyMode.NONE) {
-      setSegmentContexts(segmentContexts);
-      return;
-    }
-    if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
-      _upsertViewLock.readLock().lock();
-      try {
-        setSegmentContexts(segmentContexts);
-        return;
-      } finally {
-        _upsertViewLock.readLock().unlock();
-      }
-    }
-    // If batch refresh is enabled, the copy of bitmaps is kept updated and 
ready to use for a consistent view.
-    // The locking between query threads and upsert threads can be avoided 
when using batch refresh.
-    // Besides, queries can share the copy of bitmaps, w/o cloning the bitmaps 
by every single query.
-    // If query has specified a need for certain freshness, check the view and 
refresh it as needed.
-    // When refreshing the copy of map, we need to take the WLock so only one 
thread is refreshing view.
-    long upsertViewFreshnessMs =
-        Math.min(QueryOptionsUtils.getUpsertViewFreshnessMs(queryOptions), 
_upsertViewRefreshIntervalMs);
-    if (upsertViewFreshnessMs < 0) {
-      upsertViewFreshnessMs = _upsertViewRefreshIntervalMs;
-    }
-    doBatchRefreshUpsertView(upsertViewFreshnessMs);
-    Map<IndexSegment, MutableRoaringBitmap> currentUpsertView = 
_segmentQueryableDocIdsMap;
-    for (SegmentContext segmentContext : segmentContexts) {
-      IndexSegment segment = segmentContext.getIndexSegment();
-      MutableRoaringBitmap segmentView = currentUpsertView.get(segment);
-      if (segmentView != null) {
-        segmentContext.setQueryableDocIdsSnapshot(segmentView);
-      }
-    }
+  protected void doClose()
+      throws IOException {
   }
 
-  private void setSegmentContexts(List<SegmentContext> segmentContexts) {
-    for (SegmentContext segmentContext : segmentContexts) {
-      IndexSegment segment = segmentContext.getIndexSegment();
-      if (_trackedSegments.contains(segment)) {
-        
segmentContext.setQueryableDocIdsSnapshot(UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
-      }
-    }
+  public UpsertViewManager getUpsertViewManager() {
+    return _upsertViewManager;
   }
 
-  private boolean skipUpsertViewRefresh(long upsertViewFreshnessMs) {
-    if (upsertViewFreshnessMs < 0) {
-      return true;
+  @Override
+  public void trackSegmentForUpsertView(IndexSegment segment) {
+    if (_upsertViewManager != null) {
+      _upsertViewManager.trackSegment(segment);
     }
-    return _lastUpsertViewRefreshTimeMs + upsertViewFreshnessMs > 
System.currentTimeMillis();
   }
 
-  private void doBatchRefreshUpsertView(long upsertViewFreshnessMs) {
-    // Always refresh if the current view is still empty.
-    if (skipUpsertViewRefresh(upsertViewFreshnessMs) && 
_segmentQueryableDocIdsMap != null) {
-      return;
-    }
-    _upsertViewLock.writeLock().lock();
-    try {
-      // Check again with lock, and always refresh if the current view is 
still empty.
-      Map<IndexSegment, MutableRoaringBitmap> current = 
_segmentQueryableDocIdsMap;
-      if (skipUpsertViewRefresh(upsertViewFreshnessMs) && current != null) {
-        return;
-      }
-      Map<IndexSegment, MutableRoaringBitmap> updated = new HashMap<>();
-      for (IndexSegment segment : _trackedSegments) {
-        // Update bitmap for segment updated since last refresh or not in the 
view yet. This also handles segments
-        // that are tracked by _trackedSegments but not by 
_updatedSegmentsSinceLastRefresh, like those didn't update
-        // any bitmaps as their docs simply lost all the upsert comparisons 
with the existing docs.
-        if (current == null || current.get(segment) == null || 
_updatedSegmentsSinceLastRefresh.contains(segment)) {
-          updated.put(segment, 
UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
-        } else {
-          updated.put(segment, current.get(segment));
-        }
-      }
-      // Swap in the new consistent set of bitmaps.
-      _segmentQueryableDocIdsMap = updated;
-      _updatedSegmentsSinceLastRefresh.clear();
-      _lastUpsertViewRefreshTimeMs = System.currentTimeMillis();
-    } finally {
-      _upsertViewLock.writeLock().unlock();
+  @Override
+  public void untrackSegmentForUpsertView(IndexSegment segment) {
+    if (_upsertViewManager != null) {
+      _upsertViewManager.untrackSegment(segment);
     }
   }
-
-  @VisibleForTesting
-  Map<IndexSegment, MutableRoaringBitmap> getSegmentQueryableDocIdsMap() {
-    return _segmentQueryableDocIdsMap;
-  }
-
-  @VisibleForTesting
-  Set<IndexSegment> getUpdatedSegmentsSinceLastRefresh() {
-    return _updatedSegmentsSinceLastRefresh;
-  }
-
-  protected void doClose()
-      throws IOException {
-  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 400519079b..78a76474b6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -111,6 +111,11 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
     return _context.getPartialUpsertHandler() == null ? UpsertConfig.Mode.FULL 
: UpsertConfig.Mode.PARTIAL;
   }
 
+  @Override
+  public UpsertConfig.ConsistencyMode getUpsertConsistencyMode() {
+    return _consistencyMode;
+  }
+
   @Override
   public boolean isEnablePreload() {
     return _enablePreload;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index a0bf20a2de..ba24f2c912 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -103,9 +103,14 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
               // snapshot for the old segment, which can be updated and used 
to track the docs not replaced yet.
               if (currentSegment == oldSegment) {
                 if (comparisonResult >= 0) {
-                  addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
-                  if (validDocIdsForOldSegment != null) {
-                    validDocIdsForOldSegment.remove(currentDocId);
+                  if (validDocIdsForOldSegment == null && oldSegment != null 
&& oldSegment.getValidDocIds() != null) {
+                    // Update the old segment's bitmap in place if a copy of 
the bitmap was not provided.
+                    replaceDocId(segment, validDocIds, queryableDocIds, 
oldSegment, currentDocId, newDocId, recordInfo);
+                  } else {
+                    addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
+                    if (validDocIdsForOldSegment != null) {
+                      validDocIdsForOldSegment.remove(currentDocId);
+                    }
                   }
                   return new RecordLocation(segment, newDocId, 
newComparisonValue);
                 } else {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
index 7054b1c030..b49d09e04b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -120,9 +120,14 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
               // snapshot for the old segment, which can be updated and used 
to track the docs not replaced yet.
               if (currentSegment == oldSegment) {
                 if (comparisonResult >= 0) {
-                  addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
-                  if (validDocIdsForOldSegment != null) {
-                    validDocIdsForOldSegment.remove(currentDocId);
+                  if (validDocIdsForOldSegment == null && oldSegment != null 
&& oldSegment.getValidDocIds() != null) {
+                    // Update the old segment's bitmap in place if a copy of 
the bitmap was not provided.
+                    replaceDocId(segment, validDocIds, queryableDocIds, 
oldSegment, currentDocId, newDocId, recordInfo);
+                  } else {
+                    addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
+                    if (validDocIdsForOldSegment != null) {
+                      validDocIdsForOldSegment.remove(currentDocId);
+                    }
                   }
                   return new RecordLocation(segment, newDocId, 
newComparisonValue,
                       
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 45a6487a82..7280b86833 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -20,14 +20,19 @@ package org.apache.pinot.segment.local.upsert;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentContext;
 import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -35,6 +40,8 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
  */
 @ThreadSafe
 public class ConcurrentMapTableUpsertMetadataManager extends 
BaseTableUpsertMetadataManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConcurrentMapTableUpsertMetadataManager.class);
+
   private final Map<Integer, BasePartitionUpsertMetadataManager> 
_partitionMetadataManagerMap =
       new ConcurrentHashMap<>();
 
@@ -62,22 +69,54 @@ public class ConcurrentMapTableUpsertMetadataManager 
extends BaseTableUpsertMeta
     return partitionToPrimaryKeyCount;
   }
 
+  @Override
+  public void lockForSegmentContexts() {
+    _partitionMetadataManagerMap.forEach(
+        (partitionID, upsertMetadataManager) -> 
upsertMetadataManager.getUpsertViewManager().lockTrackedSegments());
+  }
+
+  @Override
+  public void unlockForSegmentContexts() {
+    _partitionMetadataManagerMap.forEach(
+        (partitionID, upsertMetadataManager) -> 
upsertMetadataManager.getUpsertViewManager().unlockTrackedSegments());
+  }
+
+  @Override
+  public Set<String> getOptionalSegments() {
+    Set<String> optionalSegments = new HashSet<>();
+    _partitionMetadataManagerMap.forEach((partitionID, upsertMetadataManager) 
-> optionalSegments.addAll(
+        upsertMetadataManager.getUpsertViewManager().getOptionalSegments()));
+    return optionalSegments;
+  }
+
   @Override
   public void setSegmentContexts(List<SegmentContext> segmentContexts, 
Map<String, String> queryOptions) {
-    if (_consistencyMode != UpsertConfig.ConsistencyMode.NONE && 
!QueryOptionsUtils.isSkipUpsertView(queryOptions)) {
-      // Get queryableDocIds bitmaps from partitionMetadataManagers if any 
consistency mode is used.
-      _partitionMetadataManagerMap.forEach(
-          (partitionID, upsertMetadataManager) -> 
upsertMetadataManager.setSegmentContexts(segmentContexts,
-              queryOptions));
-    }
-    // If no consistency mode is used, we get queryableDocIds bitmaps as kept 
by the segment objects directly.
-    // Even if consistency mode is used, we should still check if any segment 
doesn't get its validDocIds bitmap,
-    // because partitionMetadataManagers may not track all segments of the 
table, like those out of the metadata TTL.
-    for (SegmentContext segmentContext : segmentContexts) {
-      if (segmentContext.getQueryableDocIdsSnapshot() == null) {
+    // Get queryableDocIds bitmaps from partitionMetadataManagers if any 
consistency mode is used.
+    // Otherwise, get queryableDocIds bitmaps as kept by the segment objects 
directly as before.
+    if (_consistencyMode == UpsertConfig.ConsistencyMode.NONE || 
QueryOptionsUtils.isSkipUpsertView(queryOptions)) {
+      for (SegmentContext segmentContext : segmentContexts) {
         IndexSegment segment = segmentContext.getIndexSegment();
         
segmentContext.setQueryableDocIdsSnapshot(UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment));
       }
+      return;
+    }
+    // All segments should have been tracked by partitionMetadataManagers to 
provide queries consistent upsert view.
+    _partitionMetadataManagerMap.forEach(
+        (partitionID, upsertMetadataManager) -> 
upsertMetadataManager.getUpsertViewManager()
+            .setSegmentContexts(segmentContexts, queryOptions));
+    if (LOGGER.isDebugEnabled()) {
+      for (SegmentContext segmentContext : segmentContexts) {
+        IndexSegment segment = segmentContext.getIndexSegment();
+        if (segmentContext.getQueryableDocIdsSnapshot() == null) {
+          LOGGER.debug("No upsert view for segment: {}, type: {}, total: {}", 
segment.getSegmentName(),
+              (segment instanceof ImmutableSegment ? "imm" : "mut"), 
segment.getSegmentMetadata().getTotalDocs());
+        } else {
+          int cardCnt = 
segmentContext.getQueryableDocIdsSnapshot().getCardinality();
+          LOGGER.debug("Got upsert view of segment: {}, type: {}, total: {}, 
valid: {}", segment.getSegmentName(),
+              (segment instanceof ImmutableSegment ? "imm" : "mut"), 
segment.getSegmentMetadata().getTotalDocs(),
+              cardCnt);
+        }
+      }
     }
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 1c72470242..56c1ca7e14 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -116,4 +116,17 @@ public interface PartitionUpsertMetadataManager extends 
Closeable {
    * Stops the metadata manager. After invoking this method, no access to the 
metadata will be accepted.
    */
   void stop();
+
+  /**
+   * Track segments for upsert view, and this method must be called before 
registering segment to the table manager,
+   * so that the segment is included in the upsert view before it becomes 
visible to the query.
+   */
+  void trackSegmentForUpsertView(IndexSegment segment);
+
+  /**
+   * Untrack segments for upsert view, and this method must be called when 
segment is to be destroyed, when the
+   * segment is not used by any queries. Untrack segment after unregistering 
the segment is not safe, as there may be
+   * ongoing queries that are still accessing the segment.
+   */
+  void untrackSegmentForUpsertView(IndexSegment segment);
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index 123e3db553..b16c66af50 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.segment.local.upsert;
 
 import java.io.Closeable;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.spi.SegmentContext;
@@ -41,6 +43,8 @@ public interface TableUpsertMetadataManager extends Closeable 
{
 
   UpsertConfig.Mode getUpsertMode();
 
+  UpsertConfig.ConsistencyMode getUpsertConsistencyMode();
+
   boolean isEnablePreload();
 
   /**
@@ -57,4 +61,14 @@ public interface TableUpsertMetadataManager extends 
Closeable {
 
   default void setSegmentContexts(List<SegmentContext> segmentContexts, 
Map<String, String> queryOptions) {
   }
+
+  default void lockForSegmentContexts() {
+  }
+
+  default void unlockForSegmentContexts() {
+  }
+
+  default Set<String> getOptionalSegments() {
+    return Collections.emptySet();
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index 3086da1969..42c20f8aad 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
@@ -41,12 +42,48 @@ public class UpsertUtils {
 
   @Nullable
   public static MutableRoaringBitmap 
getQueryableDocIdsSnapshotFromSegment(IndexSegment segment) {
+    return getQueryableDocIdsSnapshotFromSegment(segment, false);
+  }
+
+  public static MutableRoaringBitmap 
getQueryableDocIdsSnapshotFromSegment(IndexSegment segment,
+      boolean useEmptyForNull) {
     ThreadSafeMutableRoaringBitmap queryableDocIds = 
segment.getQueryableDocIds();
     if (queryableDocIds != null) {
       return queryableDocIds.getMutableRoaringBitmap();
     }
     ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
-    return validDocIds != null ? validDocIds.getMutableRoaringBitmap() : null;
+    return validDocIds != null ? validDocIds.getMutableRoaringBitmap()
+        : (useEmptyForNull ? new MutableRoaringBitmap() : null);
+  }
+
+
+  public static void doReplaceDocId(ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, 
int newDocId, RecordInfo recordInfo) {
+    validDocIds.replace(oldDocId, newDocId);
+    if (queryableDocIds != null) {
+      if (recordInfo.isDeleteRecord()) {
+        queryableDocIds.remove(oldDocId);
+      } else {
+        queryableDocIds.replace(oldDocId, newDocId);
+      }
+    }
+  }
+
+  public static void doAddDocId(ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, 
RecordInfo recordInfo) {
+    validDocIds.add(docId);
+    if (queryableDocIds != null && !recordInfo.isDeleteRecord()) {
+      queryableDocIds.add(docId);
+    }
+  }
+
+
+  public static void doRemoveDocId(IndexSegment segment, int docId) {
+    Objects.requireNonNull(segment.getValidDocIds()).remove(docId);
+    ThreadSafeMutableRoaringBitmap currentQueryableDocIds = 
segment.getQueryableDocIds();
+    if (currentQueryableDocIds != null) {
+      currentQueryableDocIds.remove(docId);
+    }
   }
 
   /**
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertViewManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertViewManager.java
new file mode 100644
index 0000000000..009eac7969
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertViewManager.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.SegmentContext;
+import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class is used to provide the specified consistency mode for upsert 
table by tracking the segments and
+ * synchronizing the accesses to the validDocIds of those tracked segments 
properly. Two consistency modes are
+ * supported currently:
+ * - SYNC mode, the upsert threads take the WLock when the upsert involves two 
segments' bitmaps; and the query
+ * threads take the RLock when getting bitmaps for all its selected segments.
+ * - SNAPSHOT mode, the query threads don't need to take lock when getting 
bitmaps for all its selected segments, as
+ * the query threads access a copy of bitmaps that are kept updated by upsert 
thread periodically. But the query
+ * thread can specify a freshness threshold query option to refresh the bitmap 
copies if not fresh enough.
+ */
+public class UpsertViewManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UpsertViewManager.class);
+  private final UpsertConfig.ConsistencyMode _consistencyMode;
+
+  // NOTE that we can't reuse _trackedSegments map in 
BasePartitionUpsertMetadataManager, as it doesn't track all
+  // segments like those out of the metadata TTL, and it's called after adding 
segments to the table manager so the
+  // new segments become queryable before upsert view can get updated. So we 
use a separate map to track the segments
+  // properly. Besides, updating the set of tracked segments must be 
synchronized with queries getting segment
+  // contexts, so the need of the R/W lock.
+  private final ReadWriteLock _trackedSegmentsLock = new 
ReentrantReadWriteLock();
+  private final Set<IndexSegment> _trackedSegments = 
ConcurrentHashMap.newKeySet();
+  // Optional segments are part of the tracked segments. They can get 
processed by server before getting included in
+  // broker's routing table, like the new consuming segment. Although broker 
misses such segments, the server needs
+  // to acquire them to avoid missing the new valid docs in them.
+  private final Set<String> _optionalSegments = ConcurrentHashMap.newKeySet();
+
+  // Updating and accessing segments' validDocIds bitmaps are synchronized 
with a separate R/W lock for clarity.
+  // The query threads always get _upsertViewTrackedSegmentsLock then 
_upsertViewSegmentDocIdsLock to avoid deadlock.
+  // And the upsert threads never nest the two locks.
+  private final ReadWriteLock _upsertViewLock = new ReentrantReadWriteLock();
+  private volatile Map<IndexSegment, MutableRoaringBitmap> 
_segmentQueryableDocIdsMap;
+
+  // For SNAPSHOT mode, track segments that get new updates since last refresh 
to reduce the overhead of refreshing.
+  private final Set<IndexSegment> _updatedSegmentsSinceLastRefresh = 
ConcurrentHashMap.newKeySet();
+  private volatile long _lastUpsertViewRefreshTimeMs = 0;
+  private final long _upsertViewRefreshIntervalMs;
+
+  public UpsertViewManager(UpsertConfig.ConsistencyMode consistencyMode, 
UpsertContext context) {
+    _consistencyMode = consistencyMode;
+    _upsertViewRefreshIntervalMs = context.getUpsertViewRefreshIntervalMs();
+  }
+
+  public void replaceDocId(IndexSegment newSegment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+      ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment, 
int oldDocId, int newDocId,
+      RecordInfo recordInfo) {
+    if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
+      _upsertViewLock.writeLock().lock();
+      try {
+        UpsertUtils.doRemoveDocId(oldSegment, oldDocId);
+        UpsertUtils.doAddDocId(validDocIds, queryableDocIds, newDocId, 
recordInfo);
+        return;
+      } finally {
+        _upsertViewLock.writeLock().unlock();
+      }
+    }
+    // For SNAPSHOT mode, take read lock to sync with the batch refresh.
+    _upsertViewLock.readLock().lock();
+    try {
+      UpsertUtils.doRemoveDocId(oldSegment, oldDocId);
+      UpsertUtils.doAddDocId(validDocIds, queryableDocIds, newDocId, 
recordInfo);
+      _updatedSegmentsSinceLastRefresh.add(newSegment);
+      _updatedSegmentsSinceLastRefresh.add(oldSegment);
+    } finally {
+      _upsertViewLock.readLock().unlock();
+      // Batch refresh takes WLock. Do it outside RLock for clarity. The R/W 
lock ensures that only one thread
+      // can refresh the bitmaps. The other threads that are about to update 
the bitmaps will be blocked until
+      // refreshing is done.
+      doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs, false);
+    }
+  }
+
+  public void replaceDocId(IndexSegment segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+      ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, int 
newDocId, RecordInfo recordInfo) {
+    if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
+      UpsertUtils.doReplaceDocId(validDocIds, queryableDocIds, oldDocId, 
newDocId, recordInfo);
+      return;
+    }
+    // For SNAPSHOT mode, take read lock to sync with the batch refresh.
+    _upsertViewLock.readLock().lock();
+    try {
+      UpsertUtils.doReplaceDocId(validDocIds, queryableDocIds, oldDocId, 
newDocId, recordInfo);
+      _updatedSegmentsSinceLastRefresh.add(segment);
+    } finally {
+      _upsertViewLock.readLock().unlock();
+      // Batch refresh takes WLock. Do it outside RLock for clarity.
+      doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs, false);
+    }
+  }
+
+  public void addDocId(IndexSegment segment, ThreadSafeMutableRoaringBitmap 
validDocIds,
+      ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, RecordInfo 
recordInfo) {
+    if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
+      UpsertUtils.doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
+      return;
+    }
+    // For SNAPSHOT mode, take read lock to sync with the batch refresh.
+    _upsertViewLock.readLock().lock();
+    try {
+      UpsertUtils.doAddDocId(validDocIds, queryableDocIds, docId, recordInfo);
+      _updatedSegmentsSinceLastRefresh.add(segment);
+    } finally {
+      _upsertViewLock.readLock().unlock();
+      // Batch refresh takes WLock. Do it outside RLock for clarity.
+      doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs, false);
+    }
+  }
+
+  public void removeDocId(IndexSegment segment, int docId) {
+    if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
+      UpsertUtils.doRemoveDocId(segment, docId);
+      return;
+    }
+    // For SNAPSHOT mode, take read lock to sync with the batch refresh.
+    _upsertViewLock.readLock().lock();
+    try {
+      UpsertUtils.doRemoveDocId(segment, docId);
+      _updatedSegmentsSinceLastRefresh.add(segment);
+    } finally {
+      _upsertViewLock.readLock().unlock();
+      // Batch refresh takes WLock. Do it outside RLock for clarity.
+      doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs, false);
+    }
+  }
+
+  /**
+   * Use the segmentContexts to collect the contexts for selected segments. 
Reuse the segmentContext object if
+   * present, to avoid overwriting the contexts specified at the others places.
+   */
+  public void setSegmentContexts(List<SegmentContext> segmentContexts, 
Map<String, String> queryOptions) {
+    if (_consistencyMode == UpsertConfig.ConsistencyMode.SYNC) {
+      _upsertViewLock.readLock().lock();
+      try {
+        setSegmentContexts(segmentContexts);
+        return;
+      } finally {
+        _upsertViewLock.readLock().unlock();
+      }
+    }
+    // If batch refresh is enabled, the copy of bitmaps is kept updated and 
ready to use for a consistent view.
+    // The locking between query threads and upsert threads can be avoided 
when using batch refresh.
+    // Besides, queries can share the copy of bitmaps, w/o cloning the bitmaps 
by every single query.
+    // If query has specified a need for certain freshness, check the view and 
refresh it as needed.
+    // When refreshing the copy of map, we need to take the WLock so only one 
thread is refreshing view.
+    long upsertViewFreshnessMs =
+        Math.min(QueryOptionsUtils.getUpsertViewFreshnessMs(queryOptions), 
_upsertViewRefreshIntervalMs);
+    if (upsertViewFreshnessMs < 0) {
+      upsertViewFreshnessMs = _upsertViewRefreshIntervalMs;
+    }
+    doBatchRefreshUpsertView(upsertViewFreshnessMs, false);
+    Map<IndexSegment, MutableRoaringBitmap> currentUpsertView = 
_segmentQueryableDocIdsMap;
+    for (SegmentContext segmentContext : segmentContexts) {
+      IndexSegment segment = segmentContext.getIndexSegment();
+      MutableRoaringBitmap segmentView = currentUpsertView.get(segment);
+      if (segmentView != null) {
+        segmentContext.setQueryableDocIdsSnapshot(segmentView);
+      }
+    }
+  }
+
+  private void setSegmentContexts(List<SegmentContext> segmentContexts) {
+    for (SegmentContext segmentContext : segmentContexts) {
+      IndexSegment segment = segmentContext.getIndexSegment();
+      if (_trackedSegments.contains(segment)) {
+        
segmentContext.setQueryableDocIdsSnapshot(UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment,
 true));
+      }
+    }
+  }
+
+  private boolean skipUpsertViewRefresh(long upsertViewFreshnessMs) {
+    if (upsertViewFreshnessMs < 0) {
+      return true;
+    }
+    return _lastUpsertViewRefreshTimeMs + upsertViewFreshnessMs > 
System.currentTimeMillis();
+  }
+
+  @VisibleForTesting
+  void doBatchRefreshUpsertView(long upsertViewFreshnessMs, boolean 
forceRefresh) {
+    // Always refresh if the current view is still empty.
+    if (!forceRefresh && skipUpsertViewRefresh(upsertViewFreshnessMs) && 
_segmentQueryableDocIdsMap != null) {
+      return;
+    }
+    _upsertViewLock.writeLock().lock();
+    try {
+      // Check again with lock, and always refresh if the current view is 
still empty.
+      Map<IndexSegment, MutableRoaringBitmap> current = 
_segmentQueryableDocIdsMap;
+      if (!forceRefresh && skipUpsertViewRefresh(upsertViewFreshnessMs) && 
current != null) {
+        return;
+      }
+      if (LOGGER.isDebugEnabled()) {
+        if (current == null) {
+          LOGGER.debug("Current upsert view is still null");
+        } else {
+          current.forEach(
+              (segment, bitmap) -> LOGGER.debug("Current upsert view of 
segment: {}, type: {}, total: {}, valid: {}",
+                  segment.getSegmentName(), (segment instanceof 
ImmutableSegment ? "imm" : "mut"),
+                  segment.getSegmentMetadata().getTotalDocs(), 
bitmap.getCardinality()));
+        }
+      }
+      Map<IndexSegment, MutableRoaringBitmap> updated = new HashMap<>();
+      for (IndexSegment segment : _trackedSegments) {
+        // Update bitmap for segment updated since last refresh or not in the 
view yet. This also handles segments
+        // that are tracked by _trackedSegments but not by 
_updatedSegmentsSinceLastRefresh, like those didn't update
+        // any bitmaps as their docs simply lost all the upsert comparisons 
with the existing docs.
+        if (current == null || current.get(segment) == null || 
_updatedSegmentsSinceLastRefresh.contains(segment)) {
+          updated.put(segment, 
UpsertUtils.getQueryableDocIdsSnapshotFromSegment(segment, true));
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Update upsert view of segment: {}, type: {}, total: 
{}, valid: {}, reason: {}",
+                segment.getSegmentName(), (segment instanceof ImmutableSegment 
? "imm" : "mut"),
+                segment.getSegmentMetadata().getTotalDocs(), 
updated.get(segment).getCardinality(),
+                current == null || current.get(segment) == null ? "no view 
yet" : "bitmap updated");
+          }
+        } else {
+          updated.put(segment, current.get(segment));
+        }
+      }
+      // Swap in the new consistent set of bitmaps.
+      if (LOGGER.isDebugEnabled()) {
+        updated.forEach(
+            (segment, bitmap) -> LOGGER.debug("Updated upsert view of segment: 
{}, type: {}, total: {}, valid: {}",
+                segment.getSegmentName(), (segment instanceof ImmutableSegment 
? "imm" : "mut"),
+                segment.getSegmentMetadata().getTotalDocs(), 
bitmap.getCardinality()));
+      }
+      _segmentQueryableDocIdsMap = updated;
+      _updatedSegmentsSinceLastRefresh.clear();
+      _lastUpsertViewRefreshTimeMs = System.currentTimeMillis();
+    } finally {
+      _upsertViewLock.writeLock().unlock();
+    }
+  }
+
+  public void lockTrackedSegments() {
+    _trackedSegmentsLock.readLock().lock();
+  }
+
+  public void unlockTrackedSegments() {
+    _trackedSegmentsLock.readLock().unlock();
+  }
+
+  public Set<String> getOptionalSegments() {
+    return _optionalSegments;
+  }
+
+  public void trackSegment(IndexSegment segment) {
+    _trackedSegmentsLock.writeLock().lock();
+    try {
+      _trackedSegments.add(segment);
+      if (segment instanceof MutableSegment) {
+        _optionalSegments.add(segment.getSegmentName());
+      }
+      if (_consistencyMode == UpsertConfig.ConsistencyMode.SNAPSHOT) {
+        // Note: it's possible the segment is already tracked and the 
_trackedSegments doesn't really change here. But
+        // we should force to refresh the upsert view to include the latest 
bitmaps of the segments. This is
+        // important to fix a subtle race condition when commiting mutable 
segment. During segment replacement, the
+        // queries can access both mutable and immutable segments. But as 
replacement is done, the new queries can
+        // only access the immutable segment, thus require latest bitmap of 
the segment in the upsert view.
+        // It's required to refresh with _trackedSegmentsLock so queries are 
blocked until upsert view is updated.
+        doBatchRefreshUpsertView(0, true);
+      }
+    } finally {
+      _trackedSegmentsLock.writeLock().unlock();
+    }
+  }
+
+  public void untrackSegment(IndexSegment segment) {
+    _trackedSegmentsLock.writeLock().lock();
+    try {
+      _trackedSegments.remove(segment);
+      if (segment instanceof MutableSegment) {
+        _optionalSegments.remove(segment.getSegmentName());
+      }
+      // No need to eagerly refresh the upsert view for SNAPSHOT mode when 
untracking a segment, as the untracked
+      // segment won't be used by any new queries, thus it can be removed when 
next refresh happens later.
+    } finally {
+      _trackedSegmentsLock.writeLock().unlock();
+    }
+  }
+
+  @VisibleForTesting
+  Map<IndexSegment, MutableRoaringBitmap> getSegmentQueryableDocIdsMap() {
+    return _segmentQueryableDocIdsMap;
+  }
+
+  @VisibleForTesting
+  Set<IndexSegment> getUpdatedSegmentsSinceLastRefresh() {
+    return _updatedSegmentsSinceLastRefresh;
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index 9f86aa3326..fda7408e7a 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -369,19 +369,19 @@ public class BasePartitionUpsertMetadataManagerTest {
       latch.await();
       return validDocIds01;
     });
-    upsertMetadataManager.trackSegment(seg01);
+    upsertMetadataManager.trackSegmentForUpsertView(seg01);
     segmentQueryableDocIdsMap.put(seg01, validDocIds01);
 
     IndexSegment seg02 = mock(IndexSegment.class);
     ThreadSafeMutableRoaringBitmap validDocIds02 = 
createThreadSafeMutableRoaringBitmap(11);
     when(seg02.getValidDocIds()).thenReturn(validDocIds02);
-    upsertMetadataManager.trackSegment(seg02);
+    upsertMetadataManager.trackSegmentForUpsertView(seg02);
     segmentQueryableDocIdsMap.put(seg02, validDocIds02);
 
     IndexSegment seg03 = mock(IndexSegment.class);
     ThreadSafeMutableRoaringBitmap validDocIds03 = 
createThreadSafeMutableRoaringBitmap(12);
     when(seg03.getValidDocIds()).thenReturn(validDocIds03);
-    upsertMetadataManager.trackSegment(seg03);
+    upsertMetadataManager.trackSegmentForUpsertView(seg03);
     segmentQueryableDocIdsMap.put(seg03, validDocIds03);
 
     List<SegmentContext> segmentContexts = new ArrayList<>();
@@ -400,7 +400,7 @@ public class BasePartitionUpsertMetadataManagerTest {
           Thread.sleep(10);
         }
         segmentQueryableDocIdsMap.forEach((k, v) -> segmentContexts.add(new 
SegmentContext(k)));
-        upsertMetadataManager.setSegmentContexts(segmentContexts, new 
HashMap<>());
+        
upsertMetadataManager.getUpsertViewManager().setSegmentContexts(segmentContexts,
 new HashMap<>());
         return System.currentTimeMillis() - startMs;
       });
       // The first thread can only finish after the latch is counted down 
after 2s.
@@ -435,7 +435,7 @@ public class BasePartitionUpsertMetadataManagerTest {
       throws Exception {
     UpsertContext upsertContext = mock(UpsertContext.class);
     
when(upsertContext.getConsistencyMode()).thenReturn(UpsertConfig.ConsistencyMode.SNAPSHOT);
-    when(upsertContext.getUpsertViewRefreshIntervalMs()).thenReturn(3000L);
+    when(upsertContext.getUpsertViewRefreshIntervalMs()).thenReturn(5L);
     DummyPartitionUpsertMetadataManager upsertMetadataManager =
         new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext);
 
@@ -443,31 +443,36 @@ public class BasePartitionUpsertMetadataManagerTest {
     Map<IndexSegment, ThreadSafeMutableRoaringBitmap> 
segmentQueryableDocIdsMap = new HashMap<>();
     IndexSegment seg01 = mock(IndexSegment.class);
     ThreadSafeMutableRoaringBitmap validDocIds01 = 
createThreadSafeMutableRoaringBitmap(10);
-    AtomicBoolean called = new AtomicBoolean(false);
-    when(seg01.getValidDocIds()).then(invocationOnMock -> {
-      called.set(true);
-      latch.await();
-      return validDocIds01;
-    });
-    upsertMetadataManager.trackSegment(seg01);
+    upsertMetadataManager.trackSegmentForUpsertView(seg01);
     segmentQueryableDocIdsMap.put(seg01, validDocIds01);
 
     IndexSegment seg02 = mock(IndexSegment.class);
     ThreadSafeMutableRoaringBitmap validDocIds02 = 
createThreadSafeMutableRoaringBitmap(11);
     when(seg02.getValidDocIds()).thenReturn(validDocIds02);
-    upsertMetadataManager.trackSegment(seg02);
+    upsertMetadataManager.trackSegmentForUpsertView(seg02);
     segmentQueryableDocIdsMap.put(seg02, validDocIds02);
 
     IndexSegment seg03 = mock(IndexSegment.class);
     ThreadSafeMutableRoaringBitmap validDocIds03 = 
createThreadSafeMutableRoaringBitmap(12);
     when(seg03.getValidDocIds()).thenReturn(validDocIds03);
-    upsertMetadataManager.trackSegment(seg03);
+    upsertMetadataManager.trackSegmentForUpsertView(seg03);
     segmentQueryableDocIdsMap.put(seg03, validDocIds03);
 
     List<SegmentContext> segmentContexts = new ArrayList<>();
     ExecutorService executor = Executors.newFixedThreadPool(2);
+
+    // Set up the awaiting logic here to not block the 
trackSegmentForUpsertView methods above, as they refresh the
+    // upsert view.
+    AtomicBoolean called = new AtomicBoolean(false);
+    when(seg01.getValidDocIds()).then(invocationOnMock -> {
+      called.set(true);
+      latch.await();
+      return validDocIds01;
+    });
     try {
-      // This thread does replaceDocId and takes WLock first, and it'll 
refresh the upsert view
+      // This thread does replaceDocId and takes WLock first, and it'll 
refresh the upsert view Sleep a bit here to
+      // make the upsert view stale before doing the replaceDocId, to force it 
to refresh the upsert view.
+      Thread.sleep(10);
       executor.submit(() -> {
         RecordInfo recordInfo = new RecordInfo(null, 5, null, false);
         upsertMetadataManager.replaceDocId(seg03, validDocIds03, null, seg01, 
0, 12, recordInfo);
@@ -481,7 +486,7 @@ public class BasePartitionUpsertMetadataManagerTest {
         }
         segmentQueryableDocIdsMap.forEach((k, v) -> segmentContexts.add(new 
SegmentContext(k)));
         // This thread reuses the upsert view refreshed by the first thread 
above.
-        upsertMetadataManager.setSegmentContexts(segmentContexts, new 
HashMap<>());
+        
upsertMetadataManager.getUpsertViewManager().setSegmentContexts(segmentContexts,
 new HashMap<>());
         return System.currentTimeMillis() - startMs;
       });
       // The first thread can only finish after the latch is counted down 
after 2s.
@@ -494,14 +499,14 @@ public class BasePartitionUpsertMetadataManagerTest {
       executor.shutdownNow();
     }
     assertEquals(segmentContexts.size(), 3);
-    assertEquals(upsertMetadataManager.getSegmentQueryableDocIdsMap().size(), 
3);
-    
assertTrue(upsertMetadataManager.getUpdatedSegmentsSinceLastRefresh().isEmpty());
+    
assertEquals(upsertMetadataManager.getUpsertViewManager().getSegmentQueryableDocIdsMap().size(),
 3);
+    
assertTrue(upsertMetadataManager.getUpsertViewManager().getUpdatedSegmentsSinceLastRefresh().isEmpty());
 
     // Get the upsert view again, and the existing bitmap objects should be 
set in segment contexts.
     // The segmentContexts initialized above holds the same bitmaps objects as 
from the upsert view.
     List<SegmentContext> reuseSegmentContexts = new ArrayList<>();
     segmentQueryableDocIdsMap.forEach((k, v) -> reuseSegmentContexts.add(new 
SegmentContext(k)));
-    upsertMetadataManager.setSegmentContexts(reuseSegmentContexts, new 
HashMap<>());
+    
upsertMetadataManager.getUpsertViewManager().setSegmentContexts(reuseSegmentContexts,
 new HashMap<>());
     for (SegmentContext reuseSC : reuseSegmentContexts) {
       for (SegmentContext sc : segmentContexts) {
         if (reuseSC.getIndexSegment() == sc.getIndexSegment()) {
@@ -524,12 +529,13 @@ public class BasePartitionUpsertMetadataManagerTest {
     }
 
     // Force refresh the upsert view when getting it, so different bitmap 
objects should be set in segment contexts.
-    
upsertMetadataManager.getUpdatedSegmentsSinceLastRefresh().addAll(Arrays.asList(seg01,
 seg02, seg03));
+    
upsertMetadataManager.getUpsertViewManager().getUpdatedSegmentsSinceLastRefresh()
+        .addAll(Arrays.asList(seg01, seg02, seg03));
     List<SegmentContext> refreshSegmentContexts = new ArrayList<>();
     segmentQueryableDocIdsMap.forEach((k, v) -> refreshSegmentContexts.add(new 
SegmentContext(k)));
     Map<String, String> queryOptions = new HashMap<>();
     queryOptions.put("upsertViewFreshnessMs", "0");
-    upsertMetadataManager.setSegmentContexts(refreshSegmentContexts, 
queryOptions);
+    
upsertMetadataManager.getUpsertViewManager().setSegmentContexts(refreshSegmentContexts,
 queryOptions);
     for (SegmentContext refreshSC : refreshSegmentContexts) {
       for (SegmentContext sc : segmentContexts) {
         if (refreshSC.getIndexSegment() == sc.getIndexSegment()) {
@@ -561,25 +567,23 @@ public class BasePartitionUpsertMetadataManagerTest {
     DummyPartitionUpsertMetadataManager upsertMetadataManager =
         new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext);
 
-    CountDownLatch latch = new CountDownLatch(1);
     Map<IndexSegment, ThreadSafeMutableRoaringBitmap> 
segmentQueryableDocIdsMap = new HashMap<>();
     IndexSegment seg01 = mock(IndexSegment.class);
     ThreadSafeMutableRoaringBitmap validDocIds01 = 
createThreadSafeMutableRoaringBitmap(10);
-    AtomicBoolean called = new AtomicBoolean(false);
     when(seg01.getValidDocIds()).thenReturn(validDocIds01);
-    upsertMetadataManager.trackSegment(seg01);
+    upsertMetadataManager.trackSegmentForUpsertView(seg01);
     segmentQueryableDocIdsMap.put(seg01, validDocIds01);
 
     IndexSegment seg02 = mock(IndexSegment.class);
     ThreadSafeMutableRoaringBitmap validDocIds02 = 
createThreadSafeMutableRoaringBitmap(11);
     when(seg02.getValidDocIds()).thenReturn(validDocIds02);
-    upsertMetadataManager.trackSegment(seg02);
+    upsertMetadataManager.trackSegmentForUpsertView(seg02);
     segmentQueryableDocIdsMap.put(seg02, validDocIds02);
 
     IndexSegment seg03 = mock(IndexSegment.class);
     ThreadSafeMutableRoaringBitmap validDocIds03 = 
createThreadSafeMutableRoaringBitmap(12);
     when(seg03.getValidDocIds()).thenReturn(validDocIds03);
-    upsertMetadataManager.trackSegment(seg03);
+    upsertMetadataManager.trackSegmentForUpsertView(seg03);
     segmentQueryableDocIdsMap.put(seg03, validDocIds03);
 
     RecordInfo recordInfo = new RecordInfo(null, 5, null, false);
@@ -587,20 +591,22 @@ public class BasePartitionUpsertMetadataManagerTest {
 
     List<SegmentContext> segmentContexts = new ArrayList<>();
     segmentQueryableDocIdsMap.forEach((k, v) -> segmentContexts.add(new 
SegmentContext(k)));
-    upsertMetadataManager.setSegmentContexts(segmentContexts, new HashMap<>());
+    
upsertMetadataManager.getUpsertViewManager().setSegmentContexts(segmentContexts,
 new HashMap<>());
     assertEquals(segmentContexts.size(), 3);
-    assertEquals(upsertMetadataManager.getSegmentQueryableDocIdsMap().size(), 
3);
+    
assertEquals(upsertMetadataManager.getUpsertViewManager().getSegmentQueryableDocIdsMap().size(),
 3);
 
     // We can force to refresh the upsert view by clearing up the current 
upsert view, even though there are no updated
     // segments tracked in _updatedSegmentsSinceLastRefresh.
-    
assertTrue(upsertMetadataManager.getUpdatedSegmentsSinceLastRefresh().isEmpty());
-    upsertMetadataManager.getSegmentQueryableDocIdsMap().clear();
+    
assertEquals(upsertMetadataManager.getUpsertViewManager().getUpdatedSegmentsSinceLastRefresh().size(),
 2);
+    
assertTrue(upsertMetadataManager.getUpsertViewManager().getUpdatedSegmentsSinceLastRefresh().contains(seg01));
+    
assertTrue(upsertMetadataManager.getUpsertViewManager().getUpdatedSegmentsSinceLastRefresh().contains(seg03));
+    
upsertMetadataManager.getUpsertViewManager().getSegmentQueryableDocIdsMap().clear();
 
     List<SegmentContext> refreshSegmentContexts = new ArrayList<>();
     segmentQueryableDocIdsMap.forEach((k, v) -> refreshSegmentContexts.add(new 
SegmentContext(k)));
     Map<String, String> queryOptions = new HashMap<>();
     queryOptions.put("upsertViewFreshnessMs", "0");
-    upsertMetadataManager.setSegmentContexts(refreshSegmentContexts, 
queryOptions);
+    
upsertMetadataManager.getUpsertViewManager().setSegmentContexts(refreshSegmentContexts,
 queryOptions);
     for (SegmentContext refreshSC : refreshSegmentContexts) {
       for (SegmentContext sc : segmentContexts) {
         if (refreshSC.getIndexSegment() == sc.getIndexSegment()) {
@@ -612,6 +618,7 @@ public class BasePartitionUpsertMetadataManagerTest {
       // The upsert view holds a clone of the original queryableDocIds held by 
the segment object.
       assertNotSame(refreshSC.getQueryableDocIdsSnapshot(), 
validDocIds.getMutableRoaringBitmap());
       assertEquals(refreshSC.getQueryableDocIdsSnapshot(), 
validDocIds.getMutableRoaringBitmap());
+      assertNotNull(refreshSC.getQueryableDocIdsSnapshot());
       // docId=0 in seg01 got invalidated.
       if (refreshSC.getIndexSegment() == seg01) {
         assertFalse(refreshSC.getQueryableDocIdsSnapshot().contains(0));
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/UpsertViewManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/UpsertViewManagerTest.java
new file mode 100644
index 0000000000..dd179028ab
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/UpsertViewManagerTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.util.Collections;
+import java.util.HashMap;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.SegmentContext;
+import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+
+public class UpsertViewManagerTest {
+  @Test
+  public void testTrackUntrackSegments() {
+    UpsertViewManager mgr = new 
UpsertViewManager(UpsertConfig.ConsistencyMode.SYNC, mock(UpsertContext.class));
+    IndexSegment seg1 = mock(MutableSegment.class);
+    ThreadSafeMutableRoaringBitmap threadSafeMutableRoaringBitmap = 
mock(ThreadSafeMutableRoaringBitmap.class);
+    MutableRoaringBitmap mutableRoaringBitmap = new MutableRoaringBitmap();
+    
when(threadSafeMutableRoaringBitmap.getMutableRoaringBitmap()).thenReturn(mutableRoaringBitmap);
+    when(seg1.getValidDocIds()).thenReturn(threadSafeMutableRoaringBitmap);
+    when(seg1.getSegmentName()).thenReturn("seg1");
+
+    SegmentContext segCtx1 = new SegmentContext(seg1);
+    mgr.trackSegment(seg1);
+    assertEquals(mgr.getOptionalSegments(), 
Collections.singleton(seg1.getSegmentName()));
+    mgr.setSegmentContexts(Collections.singletonList(segCtx1), new 
HashMap<>());
+    assertSame(segCtx1.getQueryableDocIdsSnapshot(), mutableRoaringBitmap);
+
+    mgr.untrackSegment(seg1);
+    assertTrue(mgr.getOptionalSegments().isEmpty());
+    segCtx1 = new SegmentContext(seg1);
+    mgr.setSegmentContexts(Collections.singletonList(segCtx1), new 
HashMap<>());
+    assertNull(segCtx1.getQueryableDocIdsSnapshot());
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index 90338cc7af..9e72b8b76d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -383,6 +383,11 @@ public class TableConfig extends BaseJsonConfig {
     return _upsertConfig != null && _upsertConfig.getMode() != 
UpsertConfig.Mode.NONE;
   }
 
+  @JsonIgnore
+  public UpsertConfig.ConsistencyMode getUpsertConsistencyMode() {
+    return _upsertConfig == null ? UpsertConfig.ConsistencyMode.NONE : 
_upsertConfig.getConsistencyMode();
+  }
+
   @JsonIgnore
   @Nullable
   public List<String> getUpsertComparisonColumns() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to