This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bd32dc9a64 Ensure upsert deletion consistency when enabled with 
compaction flow (#13347)
bd32dc9a64 is described below

commit bd32dc9a642fbbdfef45e066ed00e97f66b80a5d
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Fri Aug 16 04:03:42 2024 +0530

    Ensure upsert deletion consistency when enabled with compaction flow 
(#13347)
---
 .../upsert/BaseTableUpsertMetadataManager.java     |    6 +-
 ...nUpsertMetadataManagerForConsistentDeletes.java |  389 +++++++
 .../ConcurrentMapTableUpsertMetadataManager.java   |   12 +-
 .../pinot/segment/local/upsert/UpsertContext.java  |   30 +-
 .../pinot/segment/local/upsert/UpsertUtils.java    |   20 +
 .../segment/local/utils/TableConfigUtils.java      |   25 +
 ...ertMetadataManagerForConsistentDeletesTest.java | 1212 ++++++++++++++++++++
 .../TableUpsertMetadataManagerFactoryTest.java     |   60 +
 .../segment/local/utils/TableConfigUtilsTest.java  |   77 ++
 .../pinot/spi/config/table/UpsertConfig.java       |   11 +
 10 files changed, 1834 insertions(+), 8 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 6d77bbc535..400519079b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -41,6 +41,7 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
   protected UpsertContext _context;
   protected UpsertConfig.ConsistencyMode _consistencyMode;
   protected boolean _enablePreload;
+  protected boolean _enableDeletedKeysCompactionConsistency;
 
   @Override
   public void init(TableConfig tableConfig, Schema schema, TableDataManager 
tableDataManager) {
@@ -71,6 +72,7 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
         enableSnapshot && upsertConfig.isEnablePreload() && 
tableDataManager.getSegmentPreloadExecutor() != null;
     double metadataTTL = upsertConfig.getMetadataTTL();
     double deletedKeysTTL = upsertConfig.getDeletedKeysTTL();
+    _enableDeletedKeysCompactionConsistency = 
upsertConfig.isEnableDeletedKeysCompactionConsistency();
     _consistencyMode = upsertConfig.getConsistencyMode();
     if (_consistencyMode == null) {
       _consistencyMode = UpsertConfig.ConsistencyMode.NONE;
@@ -83,7 +85,9 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
         
.setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot)
         
.setEnablePreload(_enablePreload).setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL)
         
.setConsistencyMode(_consistencyMode).setUpsertViewRefreshIntervalMs(upsertViewRefreshIntervalMs)
-        
.setTableIndexDir(tableIndexDir).setTableDataManager(tableDataManager).build();
+        
.setTableIndexDir(tableIndexDir).setDropOutOfOrderRecord(upsertConfig.isDropOutOfOrderRecord())
+        
.setEnableDeletedKeysCompactionConsistency(_enableDeletedKeysCompactionConsistency)
+        .setTableDataManager(tableDataManager).build();
     LOGGER.info(
         "Initialized {} for table: {} with primary key columns: {}, comparison 
columns: {}, delete record column: {},"
             + " hash function: {}, upsert mode: {}, enable snapshot: {}, 
enable preload: {}, metadata TTL: {},"
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
new file mode 100644
index 0000000000..7e8f510737
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.common.metrics.ServerMeter;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.LazyRow;
+import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+/**
+ * Implementation of {@link PartitionUpsertMetadataManager} that is backed by 
a {@link ConcurrentHashMap} and ensures
+ * consistent deletions. This should be used when the table is configured with 
'enableConsistentDeletes' set to true.
+ *
+ * Consistent deletion ensures that when deletedKeysTTL is enabled with 
UpsertCompaction, the key metadata is
+ * removed from the HashMap only after all other records in the old segments 
are compacted. This guarantees
+ * data consistency. Without this, there can be a scenario where a deleted 
record is compacted first, while an
+ * old record remains non-compacted in a previous segment. During a server 
restart, this could lead to the old
+ * record reappearing. For the end-user, this would result in a data loss or 
inconsistency scenario, as the
+ * record was marked for deletion.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+@ThreadSafe
+public class ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
+    extends BasePartitionUpsertMetadataManager {
+
+  @VisibleForTesting
+  final ConcurrentHashMap<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
+      _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+  // Used to initialize a reference to previous row for merging in partial 
upsert
+  private final LazyRow _reusePreviousRow = new LazyRow();
+  private final Map<String, Object> _reuseMergeResultHolder = new HashMap<>();
+
+  public 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(String 
tableNameWithType, int partitionId,
+      UpsertContext context) {
+    super(tableNameWithType, partitionId, context);
+  }
+
+  @Override
+  protected long getNumPrimaryKeys() {
+    return _primaryKeyToRecordLocationMap.size();
+  }
+
+  @Override
+  protected void doAddOrReplaceSegment(ImmutableSegmentImpl segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, 
Iterator<RecordInfo> recordInfoIterator,
+      @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap 
validDocIdsForOldSegment) {
+    String segmentName = segment.getSegmentName();
+    segment.enableUpsert(this, validDocIds, queryableDocIds);
+
+    AtomicInteger numKeysInWrongSegment = new AtomicInteger();
+    while (recordInfoIterator.hasNext()) {
+      RecordInfo recordInfo = recordInfoIterator.next();
+      int newDocId = recordInfo.getDocId();
+      Comparable newComparisonValue = recordInfo.getComparisonValue();
+      
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
 _hashFunction),
+          (primaryKey, currentRecordLocation) -> {
+            if (currentRecordLocation != null) {
+              // Existing primary key
+              IndexSegment currentSegment = currentRecordLocation.getSegment();
+              int currentDocId = currentRecordLocation.getDocId();
+              Comparable currentComparisonValue = 
currentRecordLocation.getComparisonValue();
+              int comparisonResult = 
newComparisonValue.compareTo(currentComparisonValue);
+              int currentDistinctSegmentCount = 
currentRecordLocation.getDistinctSegmentCount();
+
+              // The current record is in the same segment
+              // Update the record location when there is a tie to keep the 
newer record. Note that the record info
+              // iterator will return records with incremental doc ids.
+              if (currentSegment == segment) {
+                if (comparisonResult >= 0) {
+                  replaceDocId(segment, validDocIds, queryableDocIds, 
currentDocId, newDocId, recordInfo);
+                  return new 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation(segment,
+                      newDocId, newComparisonValue, 
currentDistinctSegmentCount);
+                } else {
+                  return currentRecordLocation;
+                }
+              }
+
+              // The current record is in an old segment being replaced
+              // This could happen when committing a consuming segment, or 
reloading a completed segment. In this
+              // case, we want to update the record location when there is a 
tie because the record locations should
+              // point to the new added segment instead of the old segment 
being replaced. Also, do not update the valid
+              // doc ids for the old segment because it has not been replaced 
yet. We pass in an optional valid doc ids
+              // snapshot for the old segment, which can be updated and used 
to track the docs not replaced yet.
+              if (currentSegment == oldSegment) {
+                if (comparisonResult >= 0) {
+                  addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
+                  if (validDocIdsForOldSegment != null) {
+                    validDocIdsForOldSegment.remove(currentDocId);
+                  }
+                  return new RecordLocation(segment, newDocId, 
newComparisonValue,
+                      
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
+                } else {
+                  return new RecordLocation(currentSegment, currentDocId, 
currentComparisonValue,
+                      
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
+                }
+              }
+
+              // This should not happen because the previously replaced 
segment should have all keys removed. We still
+              // handle it here, and also track the number of keys not 
properly replaced previously.
+              String currentSegmentName = currentSegment.getSegmentName();
+              if (currentSegmentName.equals(segmentName)) {
+                numKeysInWrongSegment.getAndIncrement();
+                if (comparisonResult >= 0) {
+                  addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
+                  return new RecordLocation(segment, newDocId, 
newComparisonValue,
+                      
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
+                } else {
+                  return currentRecordLocation;
+                }
+              }
+
+              // The current record is in a different segment
+              // Update the record location when getting a newer comparison 
value, or the value is the same as the
+              // current value, but the segment has a larger sequence number 
(the segment is newer than the current
+              // segment).
+              if (comparisonResult > 0 || (comparisonResult == 0 && 
shouldReplaceOnComparisonTie(segmentName,
+                  currentSegmentName, 
segment.getSegmentMetadata().getIndexCreationTime(),
+                  
currentSegment.getSegmentMetadata().getIndexCreationTime()))) {
+                replaceDocId(segment, validDocIds, queryableDocIds, 
currentSegment, currentDocId, newDocId, recordInfo);
+                return new RecordLocation(segment, newDocId, 
newComparisonValue,
+                    
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
+              } else {
+                return new RecordLocation(currentSegment, currentDocId, 
currentComparisonValue,
+                    
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
+              }
+            } else {
+              // New primary key
+              addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
+              return new RecordLocation(segment, newDocId, newComparisonValue, 
1);
+            }
+          });
+    }
+    int numKeys = numKeysInWrongSegment.get();
+    if (numKeys > 0) {
+      _logger.warn("Found {} primary keys in the wrong segment when adding 
segment: {}", numKeys, segmentName);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.UPSERT_KEYS_IN_WRONG_SEGMENT, numKeys);
+    }
+  }
+
+  @Override
+  protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, 
Iterator<RecordInfo> recordInfoIterator) {
+    throw new UnsupportedOperationException("Consistent-deletion does not 
support preloading of segments.");
+  }
+
+  @Override
+  protected void doRemoveSegment(IndexSegment segment) {
+    String segmentName = segment.getSegmentName();
+    _logger.info("Removing {} segment: {}, current primary key count: {}",
+        segment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName, getNumPrimaryKeys());
+    long startTimeMs = System.currentTimeMillis();
+
+    try (
+        UpsertUtils.PrimaryKeyReader primaryKeyReader = new 
UpsertUtils.PrimaryKeyReader(segment, _primaryKeyColumns)) {
+      removeSegment(segment,
+          UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, 
segment.getSegmentMetadata().getTotalDocs()));
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Caught exception while removing segment: %s, table: 
%s", segment.getSegmentName(),
+              _tableNameWithType), e);
+    }
+
+    // Update metrics
+    long numPrimaryKeys = getNumPrimaryKeys();
+    updatePrimaryKeyGauge(numPrimaryKeys);
+    _logger.info("Finished removing segment: {} in {}ms, current primary key 
count: {}", segmentName,
+        System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
+  }
+
+  @Override
+  protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> 
primaryKeyIterator) {
+    // We need to decrease the distinctSegmentCount for each unique primary 
key in this deleting segment by 1
+    // as the occurrence of the key in this segment is being removed. We are 
taking a set of unique primary keys
+    // to avoid double counting the same key in the same segment.
+    Set<Object> uniquePrimaryKeys = new HashSet<>();
+    while (primaryKeyIterator.hasNext()) {
+      PrimaryKey primaryKey = primaryKeyIterator.next();
+      
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
 _hashFunction),
+          (pk, recordLocation) -> {
+            if (recordLocation.getSegment() == segment) {
+              return null;
+            }
+            if (!uniquePrimaryKeys.add(pk)) {
+              return recordLocation;
+            }
+            return new RecordLocation(recordLocation.getSegment(), 
recordLocation.getDocId(),
+                recordLocation.getComparisonValue(),
+                
RecordLocation.decrementSegmentCount(recordLocation.getDistinctSegmentCount()));
+          });
+    }
+  }
+
+  @Override
+  public void doRemoveExpiredPrimaryKeys() {
+    AtomicInteger numDeletedTTLKeysRemoved = new AtomicInteger();
+    double largestSeenComparisonValue = _largestSeenComparisonValue.get();
+    double deletedKeysThreshold;
+    if (_deletedKeysTTL > 0) {
+      deletedKeysThreshold = largestSeenComparisonValue - _deletedKeysTTL;
+    } else {
+      deletedKeysThreshold = Double.MIN_VALUE;
+    }
+
+    _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
+      double comparisonValue = ((Number) 
recordLocation.getComparisonValue()).doubleValue();
+      // We need to verify that the record belongs to only one segment. If a 
record is part of multiple segments,
+      // an issue can arise where the upsert compaction might first process 
the segment containing the delete record
+      // while the previous segment(s) are not compacted. Upon restart, this 
can inadvertently revive the key
+      // that was originally marked for deletion.
+      if (_deletedKeysTTL > 0 && comparisonValue < deletedKeysThreshold
+          && recordLocation.getDistinctSegmentCount() <= 1) {
+        ThreadSafeMutableRoaringBitmap currentQueryableDocIds = 
recordLocation.getSegment().getQueryableDocIds();
+        // if key not part of queryable doc id, it means it is deleted
+        if (currentQueryableDocIds != null && 
!currentQueryableDocIds.contains(recordLocation.getDocId())) {
+          _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
+          removeDocId(recordLocation.getSegment(), recordLocation.getDocId());
+          numDeletedTTLKeysRemoved.getAndIncrement();
+        }
+      }
+    });
+
+    // Update metrics
+    updatePrimaryKeyGauge();
+    int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get();
+    if (numDeletedTTLKeys > 0) {
+      _logger.info("Deleted {} primary keys based on deletedKeysTTL", 
numDeletedTTLKeys);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED,
+          numDeletedTTLKeys);
+    }
+  }
+
+  @Override
+  protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) 
{
+    AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false);
+    ThreadSafeMutableRoaringBitmap validDocIds = 
Objects.requireNonNull(segment.getValidDocIds());
+    ThreadSafeMutableRoaringBitmap queryableDocIds = 
segment.getQueryableDocIds();
+    int newDocId = recordInfo.getDocId();
+    Comparable newComparisonValue = recordInfo.getComparisonValue();
+
+    // When TTL is enabled, update largestSeenComparisonValue when adding new 
record
+    if (_deletedKeysTTL > 0) {
+      double comparisonValue = ((Number) newComparisonValue).doubleValue();
+      _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, 
comparisonValue));
+    }
+
+    
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
 _hashFunction),
+        (primaryKey, currentRecordLocation) -> {
+          if (currentRecordLocation != null) {
+            // Existing primary key
+            IndexSegment currentSegment = currentRecordLocation.getSegment();
+            // Update the record location when the new comparison value is 
greater than or equal to the current value.
+            // Update the record location when there is a tie to keep the 
newer record.
+            if 
(newComparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) 
{
+              int currentDocId = currentRecordLocation.getDocId();
+              if (segment == currentSegment) {
+                replaceDocId(segment, validDocIds, queryableDocIds, 
currentDocId, newDocId, recordInfo);
+                return new RecordLocation(segment, newDocId, 
newComparisonValue,
+                    currentRecordLocation.getDistinctSegmentCount());
+              } else {
+                replaceDocId(segment, validDocIds, queryableDocIds, 
currentSegment, currentDocId, newDocId, recordInfo);
+                return new RecordLocation(segment, newDocId, 
newComparisonValue,
+                    
RecordLocation.incrementSegmentCount(currentRecordLocation.getDistinctSegmentCount()));
+              }
+            } else {
+              // Out-of-order record
+              
handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), 
recordInfo.getComparisonValue());
+              isOutOfOrderRecord.set(true);
+              if (segment == currentSegment) {
+                return currentRecordLocation;
+              } else {
+                return new RecordLocation(currentSegment, 
currentRecordLocation.getDocId(),
+                    currentRecordLocation.getComparisonValue(),
+                    _context.isDropOutOfOrderRecord() ? 
currentRecordLocation.getDistinctSegmentCount()
+                        : 
RecordLocation.incrementSegmentCount(currentRecordLocation.getDistinctSegmentCount()));
+              }
+            }
+          } else {
+            // New primary key
+            addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
+            return new RecordLocation(segment, newDocId, newComparisonValue, 
1);
+          }
+        });
+
+    updatePrimaryKeyGauge();
+    return !isOutOfOrderRecord.get();
+  }
+
+  @Override
+  protected GenericRow doUpdateRecord(GenericRow record, RecordInfo 
recordInfo) {
+    assert _partialUpsertHandler != null;
+    
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
 _hashFunction),
+        (pk, recordLocation) -> {
+          // Read the previous record if the following conditions are met:
+          // - New record is not a DELETE record
+          // - New record is not out-of-order
+          // - Previous record is not deleted
+          if (!recordInfo.isDeleteRecord()
+              && 
recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue()) 
>= 0) {
+            IndexSegment currentSegment = recordLocation.getSegment();
+            ThreadSafeMutableRoaringBitmap currentQueryableDocIds = 
currentSegment.getQueryableDocIds();
+            int currentDocId = recordLocation.getDocId();
+            if (currentQueryableDocIds == null || 
currentQueryableDocIds.contains(currentDocId)) {
+              _reusePreviousRow.init(currentSegment, currentDocId);
+              _partialUpsertHandler.merge(_reusePreviousRow, record, 
_reuseMergeResultHolder);
+              _reuseMergeResultHolder.clear();
+            }
+          }
+          return recordLocation;
+        });
+    return record;
+  }
+
+  @VisibleForTesting
+  static class RecordLocation {
+    private final IndexSegment _segment;
+    private final int _docId;
+    private final Comparable _comparisonValue;
+    // The number of distinct segments in which the record is present. If this 
count is less than or equal to 1,
+    // we proceed to remove the record from the primary hashmap during the 
deletedKeysTTL process.
+    private final int _distinctSegmentCount;
+
+    public RecordLocation(IndexSegment indexSegment, int docId, Comparable 
comparisonValue, int distinctSegmentCount) {
+      _segment = indexSegment;
+      _docId = docId;
+      _comparisonValue = comparisonValue;
+      _distinctSegmentCount = distinctSegmentCount;
+    }
+
+    public static int incrementSegmentCount(int count) {
+      return count + 1;
+    }
+
+    public static int decrementSegmentCount(int count) {
+      return count - 1;
+    }
+
+    public IndexSegment getSegment() {
+      return _segment;
+    }
+
+    public int getDocId() {
+      return _docId;
+    }
+
+    public Comparable getComparisonValue() {
+      return _comparisonValue;
+    }
+
+    public int getDistinctSegmentCount() {
+      return _distinctSegmentCount;
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 7b216acfc3..45a6487a82 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -35,18 +35,20 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
  */
 @ThreadSafe
 public class ConcurrentMapTableUpsertMetadataManager extends 
BaseTableUpsertMetadataManager {
-  private final Map<Integer, ConcurrentMapPartitionUpsertMetadataManager> 
_partitionMetadataManagerMap =
+  private final Map<Integer, BasePartitionUpsertMetadataManager> 
_partitionMetadataManagerMap =
       new ConcurrentHashMap<>();
 
   @Override
-  public ConcurrentMapPartitionUpsertMetadataManager 
getOrCreatePartitionManager(int partitionId) {
+  public BasePartitionUpsertMetadataManager getOrCreatePartitionManager(int 
partitionId) {
     return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
-        k -> new 
ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _context));
+        k -> _enableDeletedKeysCompactionConsistency
+            ? new 
ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _context)
+            : new 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(_tableNameWithType,
 k, _context));
   }
 
   @Override
   public void stop() {
-    for (ConcurrentMapPartitionUpsertMetadataManager metadataManager : 
_partitionMetadataManagerMap.values()) {
+    for (BasePartitionUpsertMetadataManager metadataManager : 
_partitionMetadataManagerMap.values()) {
       metadataManager.stop();
     }
   }
@@ -82,7 +84,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends 
BaseTableUpsertMeta
   @Override
   public void close()
       throws IOException {
-    for (ConcurrentMapPartitionUpsertMetadataManager metadataManager : 
_partitionMetadataManagerMap.values()) {
+    for (BasePartitionUpsertMetadataManager metadataManager : 
_partitionMetadataManagerMap.values()) {
       metadataManager.close();
     }
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
index 837d94a963..69323c0e65 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
@@ -45,13 +45,16 @@ public class UpsertContext {
   private final UpsertConfig.ConsistencyMode _consistencyMode;
   private final long _upsertViewRefreshIntervalMs;
   private final File _tableIndexDir;
+  private final boolean _dropOutOfOrderRecord;
+  private final boolean _enableDeletedKeysCompactionConsistency;
   private final TableDataManager _tableDataManager;
 
   private UpsertContext(TableConfig tableConfig, Schema schema, List<String> 
primaryKeyColumns,
       List<String> comparisonColumns, @Nullable String deleteRecordColumn, 
HashFunction hashFunction,
       @Nullable PartialUpsertHandler partialUpsertHandler, boolean 
enableSnapshot, boolean enablePreload,
       double metadataTTL, double deletedKeysTTL, UpsertConfig.ConsistencyMode 
consistencyMode,
-      long upsertViewRefreshIntervalMs, File tableIndexDir, @Nullable 
TableDataManager tableDataManager) {
+      long upsertViewRefreshIntervalMs, File tableIndexDir, boolean 
dropOutOfOrderRecord,
+      boolean enableDeletedKeysCompactionConsistency, @Nullable 
TableDataManager tableDataManager) {
     _tableConfig = tableConfig;
     _schema = schema;
     _primaryKeyColumns = primaryKeyColumns;
@@ -66,6 +69,8 @@ public class UpsertContext {
     _consistencyMode = consistencyMode;
     _upsertViewRefreshIntervalMs = upsertViewRefreshIntervalMs;
     _tableIndexDir = tableIndexDir;
+    _dropOutOfOrderRecord = dropOutOfOrderRecord;
+    _enableDeletedKeysCompactionConsistency = 
enableDeletedKeysCompactionConsistency;
     _tableDataManager = tableDataManager;
   }
 
@@ -125,6 +130,14 @@ public class UpsertContext {
     return _tableIndexDir;
   }
 
+  public boolean isDropOutOfOrderRecord() {
+    return _dropOutOfOrderRecord;
+  }
+
+  public boolean isEnableDeletedKeysCompactionConsistency() {
+    return _enableDeletedKeysCompactionConsistency;
+  }
+
   public TableDataManager getTableDataManager() {
     return _tableDataManager;
   }
@@ -144,6 +157,8 @@ public class UpsertContext {
     private UpsertConfig.ConsistencyMode _consistencyMode;
     private long _upsertViewRefreshIntervalMs;
     private File _tableIndexDir;
+    private boolean _dropOutOfOrderRecord;
+    private boolean _enableDeletedKeysCompactionConsistency;
     private TableDataManager _tableDataManager;
 
     public Builder setTableConfig(TableConfig tableConfig) {
@@ -216,6 +231,16 @@ public class UpsertContext {
       return this;
     }
 
+    public Builder setDropOutOfOrderRecord(boolean dropOutOfOrderRecord) {
+      _dropOutOfOrderRecord = dropOutOfOrderRecord;
+      return this;
+    }
+
+    public Builder setEnableDeletedKeysCompactionConsistency(boolean 
enableDeletedKeysCompactionConsistency) {
+      _enableDeletedKeysCompactionConsistency = 
enableDeletedKeysCompactionConsistency;
+      return this;
+    }
+
     public Builder setTableDataManager(TableDataManager tableDataManager) {
       _tableDataManager = tableDataManager;
       return this;
@@ -230,7 +255,8 @@ public class UpsertContext {
       Preconditions.checkState(_tableIndexDir != null, "Table index directory 
must be set");
       return new UpsertContext(_tableConfig, _schema, _primaryKeyColumns, 
_comparisonColumns, _deleteRecordColumn,
           _hashFunction, _partialUpsertHandler, _enableSnapshot, 
_enablePreload, _metadataTTL, _deletedKeysTTL,
-          _consistencyMode, _upsertViewRefreshIntervalMs, _tableIndexDir, 
_tableDataManager);
+          _consistencyMode, _upsertViewRefreshIntervalMs, _tableIndexDir, 
_dropOutOfOrderRecord,
+          _enableDeletedKeysCompactionConsistency, _tableDataManager);
     }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index 6484c354f9..47d6a72743 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -108,6 +108,26 @@ public class UpsertUtils {
     };
   }
 
+  /**
+   * Returns an iterator of {@link PrimaryKey} for all the documents from the 
segment.
+   */
+  public static Iterator<PrimaryKey> getPrimaryKeyIterator(PrimaryKeyReader 
primaryKeyReader,
+      int numDocs) {
+    return new Iterator<>() {
+      private int _docId = 0;
+
+      @Override
+      public boolean hasNext() {
+        return _docId < numDocs;
+      }
+
+      @Override
+      public PrimaryKey next() {
+        return primaryKeyReader.getPrimaryKey(_docId++);
+      }
+    };
+  }
+
   public static class RecordInfoReader implements Closeable {
     private final PrimaryKeyReader _primaryKeyReader;
     private final ComparisonColumnReader _comparisonColumnReader;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 16597eebd5..54460fa47c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -858,6 +858,31 @@ public final class TableConfigUtils {
       }
     }
 
+
+    if (upsertConfig != null && 
upsertConfig.isEnableDeletedKeysCompactionConsistency()) {
+      // enableDeletedKeysCompactionConsistency shouldn't exist with 
metadataTTL
+      Preconditions.checkState(upsertConfig.getMetadataTTL() == 0,
+          "enableDeletedKeysCompactionConsistency and metadataTTL shouldn't 
exist together for upsert table");
+
+      // enableDeletedKeysCompactionConsistency shouldn't exist with 
enablePreload
+      Preconditions.checkState(!upsertConfig.isEnablePreload(),
+          "enableDeletedKeysCompactionConsistency and enablePreload shouldn't 
exist together for upsert table");
+
+      // enableDeletedKeysCompactionConsistency should exist with 
deletedKeysTTL
+      Preconditions.checkState(upsertConfig.getDeletedKeysTTL() > 0,
+          "enableDeletedKeysCompactionConsistency should exist with 
deletedKeysTTL for upsert table");
+
+      // enableDeletedKeysCompactionConsistency should exist with 
enableSnapshot
+      Preconditions.checkState(upsertConfig.isEnableSnapshot(),
+          "enableDeletedKeysCompactionConsistency should exist with 
enableSnapshot for upsert table");
+
+      // enableDeletedKeysCompactionConsistency should exist with 
UpsertCompactionTask
+      TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkState(taskConfig != null
+              && 
taskConfig.getTaskTypeConfigsMap().containsKey(UPSERT_COMPACTION_TASK_TYPE),
+          "enableDeletedKeysCompactionConsistency should exist with 
UpsertCompactionTask for upsert table");
+    }
+
     Preconditions.checkState(
         tableConfig.getInstanceAssignmentConfigMap() == null || 
!tableConfig.getInstanceAssignmentConfigMap()
             .containsKey(InstancePartitionsType.COMPLETED),
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
new file mode 100644
index 0000000000..4a6ab831cb
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
@@ -0,0 +1,1212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
+import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.mockito.MockedConstruction;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+
+/**
+ * This class tries to replicate the behaviour for {@code 
ConcurrentMapPartitionUpsertMetadataManagerTest} assuming
+ * that _enableConsistentDeletes is enabled, and accordingly we set all the 
params in {@code setUpContextBuilder}.
+ * We have removed preload and metadataTTL unit-tests for now as we don't 
allow them along with
+ * _enableConsistentDeletes.
+ */
+public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest {
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+  private static final List<String> PRIMARY_KEY_COLUMNS = 
Collections.singletonList("pk");
+  private static final List<String> COMPARISON_COLUMNS = 
Collections.singletonList("timeCol");
+  private static final String DELETE_RECORD_COLUMN = "deleteCol";
+  private static final File INDEX_DIR =
+      new File(FileUtils.getTempDirectory(), 
"ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest");
+
+  private UpsertContext.Builder _contextBuilder;
+
+  private static ImmutableSegmentImpl mockImmutableSegment(int sequenceNumber,
+      ThreadSafeMutableRoaringBitmap validDocIds, @Nullable 
ThreadSafeMutableRoaringBitmap queryableDocIds,
+      List<PrimaryKey> primaryKeys) {
+    ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
+    when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
+    when(segment.getValidDocIds()).thenReturn(validDocIds);
+    when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
+    DataSource dataSource = mock(DataSource.class);
+    when(segment.getDataSource(anyString())).thenReturn(dataSource);
+    ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
+    when(forwardIndex.isSingleValue()).thenReturn(true);
+    when(forwardIndex.getStoredType()).thenReturn(FieldSpec.DataType.INT);
+    when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
+        invocation -> 
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
+    when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+    SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+    
when(segmentMetadata.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+    when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+    return segment;
+  }
+
+  private static ImmutableSegmentImpl mockUploadedImmutableSegment(String 
suffix,
+      ThreadSafeMutableRoaringBitmap validDocIds, @Nullable 
ThreadSafeMutableRoaringBitmap queryableDocIds,
+      List<PrimaryKey> primaryKeys, Long creationTimeMs) {
+    if (creationTimeMs == null) {
+      creationTimeMs = System.currentTimeMillis();
+    }
+    ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
+    
when(segment.getSegmentName()).thenReturn(getUploadedRealtimeSegmentName(creationTimeMs,
 suffix));
+    when(segment.getValidDocIds()).thenReturn(validDocIds);
+    when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
+    DataSource dataSource = mock(DataSource.class);
+    when(segment.getDataSource(anyString())).thenReturn(dataSource);
+    ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
+    when(forwardIndex.isSingleValue()).thenReturn(true);
+    when(forwardIndex.getStoredType()).thenReturn(FieldSpec.DataType.INT);
+    when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
+        invocation -> 
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
+    when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+    SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+    when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
+    when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+    return segment;
+  }
+
+  private static ImmutableSegmentImpl 
mockImmutableSegmentWithSegmentMetadata(int sequenceNumber,
+      ThreadSafeMutableRoaringBitmap validDocIds, @Nullable 
ThreadSafeMutableRoaringBitmap queryableDocIds,
+      List<PrimaryKey> primaryKeys, SegmentMetadataImpl segmentMetadata, 
MutableRoaringBitmap snapshot) {
+    ImmutableSegmentImpl segment = mockImmutableSegment(sequenceNumber, 
validDocIds, queryableDocIds, primaryKeys);
+    when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+    when(segment.loadValidDocIdsFromSnapshot()).thenReturn(snapshot);
+    return segment;
+  }
+
+  private static EmptyIndexSegment mockEmptySegment(int sequenceNumber) {
+    SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+    when(segmentMetadata.getName()).thenReturn(getSegmentName(sequenceNumber));
+    return new EmptyIndexSegment(segmentMetadata);
+  }
+
+  private static MutableSegment mockMutableSegment(int sequenceNumber, 
ThreadSafeMutableRoaringBitmap validDocIds,
+      ThreadSafeMutableRoaringBitmap queryableDocIds) {
+    MutableSegment segment = mock(MutableSegment.class);
+    when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
+    when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
+    when(segment.getValidDocIds()).thenReturn(validDocIds);
+    return segment;
+  }
+
+  private static String getSegmentName(int sequenceNumber) {
+    return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber, 
System.currentTimeMillis()).toString();
+  }
+
+  private static String getUploadedRealtimeSegmentName(long creationTimeMs, 
String suffix) {
+    return new UploadedRealtimeSegmentName(RAW_TABLE_NAME, 0, creationTimeMs, 
"uploaded", suffix).toString();
+  }
+
+  private static PrimaryKey makePrimaryKey(int value) {
+    return new PrimaryKey(new Object[]{value});
+  }
+
+  private static void checkRecordLocation(
+      Map<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation> 
recordLocationMap,
+      int keyValue, IndexSegment segment, int docId, int comparisonValue, 
HashFunction hashFunction) {
+    
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation 
recordLocation =
+        
recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue), 
hashFunction));
+    assertNotNull(recordLocation);
+    assertSame(recordLocation.getSegment(), segment);
+    assertEquals(recordLocation.getDocId(), docId);
+    assertEquals(((Integer) recordLocation.getComparisonValue()), 
comparisonValue);
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws IOException {
+    FileUtils.forceMkdir(INDEX_DIR);
+    ServerMetrics.register(mock(ServerMetrics.class));
+  }
+
+  @BeforeMethod
+  public void setUpContextBuilder() {
+    _contextBuilder = new 
UpsertContext.Builder().setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
+        
.setPrimaryKeyColumns(PRIMARY_KEY_COLUMNS).setComparisonColumns(COMPARISON_COLUMNS)
+        
.setEnableDeletedKeysCompactionConsistency(true).setTableIndexDir(INDEX_DIR).setEnableSnapshot(true)
+        .setDeleteRecordColumn(DELETE_RECORD_COLUMN).setDeletedKeysTTL(20);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    FileUtils.forceDelete(INDEX_DIR);
+  }
+
+  @Test
+  public void testStartFinishOperation() {
+    ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes 
upsertMetadataManager =
+        new 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
 0,
+            _contextBuilder.build());
+
+    // Start 2 operations
+    assertTrue(upsertMetadataManager.startOperation());
+    assertTrue(upsertMetadataManager.startOperation());
+
+    // Stop and close the metadata manager
+    AtomicBoolean stopped = new AtomicBoolean();
+    AtomicBoolean closed = new AtomicBoolean();
+    // Avoid early finalization by not using Executors.newSingleThreadExecutor 
(java <= 20, JDK-8145304)
+    ExecutorService executor = Executors.newFixedThreadPool(1);
+    executor.submit(() -> {
+      upsertMetadataManager.stop();
+      stopped.set(true);
+      try {
+        upsertMetadataManager.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      closed.set(true);
+    });
+    executor.shutdown();
+
+    // Wait for metadata manager to be stopped
+    TestUtils.waitForCondition(aVoid -> stopped.get(), 10_000L, "Failed to 
stop the metadata manager");
+
+    // Metadata manager should block on close because there are 2 pending 
operations
+    assertFalse(closed.get());
+
+    // Starting new operation should fail because the metadata manager is 
already stopped
+    assertFalse(upsertMetadataManager.startOperation());
+
+    // Finish one operation
+    upsertMetadataManager.finishOperation();
+
+    // Metadata manager should still block on close because there is still 1 
pending operation
+    assertFalse(closed.get());
+
+    // Finish the other operation
+    upsertMetadataManager.finishOperation();
+
+    // Metadata manager should be closed now
+    TestUtils.waitForCondition(aVoid -> closed.get(), 10_000L, "Failed to 
close the metadata manager");
+  }
+
+  @Test
+  public void testAddReplaceRemoveSegment()
+      throws IOException {
+    verifyAddReplaceRemoveSegment(HashFunction.NONE);
+    verifyAddReplaceRemoveSegment(HashFunction.MD5);
+    verifyAddReplaceRemoveSegment(HashFunction.MURMUR3);
+  }
+
+  @Test
+  public void testGetQueryableDocIds() {
+    boolean[] deleteFlags1 = new boolean[]{false, false, false, true, true, 
false};
+    int[] docIds1 = new int[]{2, 4, 5};
+    MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+    validDocIdsSnapshot1.add(docIds1);
+    MutableRoaringBitmap queryableDocIds1 = new MutableRoaringBitmap();
+    queryableDocIds1.add(new int[]{2, 5});
+    verifyGetQueryableDocIds(false, deleteFlags1, validDocIdsSnapshot1, 
queryableDocIds1);
+
+    // all records are not deleted
+    boolean[] deleteFlags2 = new boolean[]{false, false, false, false, false, 
false};
+    int[] docIds2 = new int[]{2, 4, 5};
+    MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+    validDocIdsSnapshot2.add(docIds2);
+    MutableRoaringBitmap queryableDocIds2 = new MutableRoaringBitmap();
+    queryableDocIds2.add(docIds2);
+    verifyGetQueryableDocIds(false, deleteFlags2, validDocIdsSnapshot2, 
queryableDocIds2);
+
+    // delete column has null values
+    boolean[] deleteFlags3 = new boolean[]{false, false, false, false, false, 
false};
+    int[] docIds3 = new int[]{2, 4, 5};
+    MutableRoaringBitmap validDocIdsSnapshot3 = new MutableRoaringBitmap();
+    validDocIdsSnapshot3.add(docIds3);
+    MutableRoaringBitmap queryableDocIds3 = new MutableRoaringBitmap();
+    queryableDocIds3.add(docIds3);
+    verifyGetQueryableDocIds(true, deleteFlags3, validDocIdsSnapshot3, 
queryableDocIds3);
+
+    // All records are deleted record.
+    boolean[] deleteFlags4 = new boolean[]{true, true, true, true, true, true};
+    int[] docIds4 = new int[]{2, 4, 5};
+    MutableRoaringBitmap validDocIdsSnapshot4 = new MutableRoaringBitmap();
+    validDocIdsSnapshot4.add(docIds4);
+    MutableRoaringBitmap queryableDocIds4 = new MutableRoaringBitmap();
+    verifyGetQueryableDocIds(false, deleteFlags4, validDocIdsSnapshot4, 
queryableDocIds4);
+  }
+
+  private void verifyAddReplaceRemoveSegment(HashFunction hashFunction)
+      throws IOException {
+    ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes 
upsertMetadataManager =
+        new 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
 0,
+            _contextBuilder.setHashFunction(hashFunction).build());
+    Map<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation> 
recordLocationMap =
+        upsertMetadataManager._primaryKeyToRecordLocationMap;
+    Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+    // Add the first segment
+    int numRecords = 6;
+    int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+    int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+    SegmentMetadataImpl segmentMetadata1 = mock(SegmentMetadataImpl.class);
+    
when(segmentMetadata1.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+    when(segmentMetadata1.getTotalDocs()).thenReturn(numRecords);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithSegmentMetadata(1, validDocIds1, null, 
primaryKeys1, segmentMetadata1, null);
+    List<RecordInfo> recordInfoList1;
+    // get recordInfo from validDocIdSnapshot.
+    // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    int[] docIds1 = new int[]{2, 4, 5};
+    MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+    validDocIdsSnapshot1.add(docIds1);
+    recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys, 
timestamps, null);
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+    trackedSegments.add(segment1);
+    // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4, 5});
+
+    // Add the second segment
+    numRecords = 5;
+    primaryKeys = new int[]{0, 1, 2, 3, 0};
+    timestamps = new int[]{100, 100, 120, 80, 80};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    SegmentMetadataImpl segmentMetadata2 = mock(SegmentMetadataImpl.class);
+    
when(segmentMetadata2.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+    when(segmentMetadata2.getTotalDocs()).thenReturn(numRecords);
+    ImmutableSegmentImpl segment2 =
+        mockImmutableSegmentWithSegmentMetadata(2, validDocIds2, null, 
getPrimaryKeyList(numRecords, primaryKeys),
+            segmentMetadata2, null);
+    List<RecordInfo> recordInfoList2;
+    // get recordInfo from validDocIdSnapshot.
+    // segment2 snapshot: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // segment1 snapshot: 1 -> {4, 120}
+    MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+    validDocIdsSnapshot2.add(0, 2, 3);
+    recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys, 
timestamps, null);
+    upsertMetadataManager.addSegment(segment2, validDocIds2, null, 
recordInfoList2.iterator());
+    trackedSegments.add(segment2);
+
+    // segment1: 1 -> {4, 120}
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+
+    // Add an empty segment
+    EmptyIndexSegment emptySegment = mockEmptySegment(3);
+    upsertMetadataManager.addSegment(emptySegment);
+    // segment1: 1 -> {4, 120}
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+
+    // Replace (reload) the first segment
+    ThreadSafeMutableRoaringBitmap newValidDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    SegmentMetadataImpl newSegmentMetadata1 = mock(SegmentMetadataImpl.class);
+    
when(newSegmentMetadata1.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+    when(newSegmentMetadata1.getTotalDocs()).thenReturn(primaryKeys1.size());
+    ImmutableSegmentImpl newSegment1 =
+        mockImmutableSegmentWithSegmentMetadata(1, newValidDocIds1, null, 
primaryKeys1, newSegmentMetadata1, null);
+    upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, null, 
recordInfoList1.iterator(), segment1);
+    trackedSegments.add(newSegment1);
+    trackedSegments.remove(segment1);
+    // original segment1: 1 -> {4, 120} (not in the map)
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+
+    // Remove the original segment1
+    upsertMetadataManager.removeSegment(segment1);
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+
+    // Remove the empty segment
+    upsertMetadataManager.removeSegment(emptySegment);
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+
+    // Remove segment2
+    upsertMetadataManager.removeSegment(segment2);
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map)
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 1);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(trackedSegments, Collections.singleton(newSegment1));
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Remove new segment1, should be no-op
+    upsertMetadataManager.removeSegment(newSegment1);
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 1);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(trackedSegments, Collections.singleton(newSegment1));
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testAddReplaceRemoveSegmentWithRecordDelete()
+      throws IOException {
+    verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.NONE);
+    verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MD5);
+    verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3);
+  }
+
+  @Test
+  public void verifyAddReplaceUploadedSegment1()
+      throws IOException {
+    ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes 
upsertMetadataManager =
+        new 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
 0,
+            _contextBuilder.setHashFunction(HashFunction.NONE).build());
+    Map<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation> 
recordLocationMap =
+        upsertMetadataManager._primaryKeyToRecordLocationMap;
+    Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+    // Add the first segment
+    int numRecords = 6;
+    int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+    int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+    SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+    when(segmentMetadata.getIndexCreationTime()).thenReturn(1000L);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithSegmentMetadata(1, validDocIds1, null, 
primaryKeys1, segmentMetadata, null);
+    List<RecordInfo> recordInfoList1;
+    // get recordInfo by iterating all records.
+    recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+    trackedSegments.add(segment1);
+    // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4, 5});
+
+    // Add the second segment of uploaded name format with same creation time
+    numRecords = 2;
+    primaryKeys = new int[]{0, 3};
+    timestamps = new int[]{100, 80};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl uploadedSegment2 =
+        mockUploadedImmutableSegment("2", validDocIds2, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1000L);
+    List<RecordInfo> recordInfoList2;
+    // get recordInfo by iterating all records.
+    recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+    upsertMetadataManager.addSegment(uploadedSegment2, validDocIds2, null, 
recordInfoList2.iterator());
+    trackedSegments.add(uploadedSegment2);
+
+    // segment1: 1 -> {4, 120}, 2 -> {2, 100}
+    // uploadedSegment2: 0 -> {0, 100}, 3 -> {1, 80}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, uploadedSegment2, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, uploadedSegment2, 1, 80, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+
+    // replace uploadedSegment2
+    ThreadSafeMutableRoaringBitmap newValidDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl newUploadedSegment2 =
+        mockUploadedImmutableSegment("2", newValidDocIds2, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1020L);
+    upsertMetadataManager.replaceSegment(newUploadedSegment2, newValidDocIds2, 
null, recordInfoList2.iterator(),
+        uploadedSegment2);
+    trackedSegments.add(newUploadedSegment2);
+    trackedSegments.remove(uploadedSegment2);
+
+    // segment1: 1 -> {4, 120}, 2 -> {2, 100}
+    // newUploadedSegment2: 0 -> {0, 100}, 3 -> {1, 80}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, newUploadedSegment2, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4});
+    assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+
+    // add upploadedSegment3 with higher creation time than newUploadedSegment2
+    numRecords = 1;
+    primaryKeys = new int[]{0};
+    timestamps = new int[]{100};
+    ThreadSafeMutableRoaringBitmap validDocIds3 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl uploadedSegment3 =
+        mockUploadedImmutableSegment("3", validDocIds3, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1040L);
+    List<RecordInfo> recordInfoList3;
+    // get recordInfo by iterating all records.
+    recordInfoList3 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+    upsertMetadataManager.addSegment(uploadedSegment3, validDocIds3, null, 
recordInfoList3.iterator());
+
+    // segment1: 1 -> {4, 120}, 2 -> {2, 100}
+    // newUploadedSegment2: 3 -> {1, 80}
+    // uploadedSegment3: 0 -> {0, 100}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4});
+    assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1});
+    assertEquals(validDocIds3.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+
+    // add uploadedSegment4 with higher creation time than segment 1 and same 
creation time as uploadedSegment3
+    numRecords = 2;
+    primaryKeys = new int[]{0, 1};
+    timestamps = new int[]{100, 120};
+    ThreadSafeMutableRoaringBitmap validDocIds4 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl uploadedSegment4 =
+        mockUploadedImmutableSegment("4", validDocIds4, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1040L);
+    List<RecordInfo> recordInfoList4;
+    // get recordInfo by iterating all records.
+    recordInfoList4 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+    upsertMetadataManager.addSegment(uploadedSegment4, validDocIds4, null, 
recordInfoList4.iterator());
+
+    // segment1: 2 -> {2, 100}
+    // newUploadedSegment2: 3 -> {1, 80}
+    // uploadedSegment3: 0 -> {0, 100}
+    // uploadedSegment4: 1 -> {1, 120}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, uploadedSegment4, 1, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2});
+    assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1});
+    assertEquals(validDocIds3.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+    assertEquals(validDocIds4.getMutableRoaringBitmap().toArray(), new 
int[]{1});
+
+    // remove segments
+    upsertMetadataManager.removeSegment(segment1);
+    upsertMetadataManager.removeSegment(uploadedSegment2);
+    upsertMetadataManager.removeSegment(newUploadedSegment2);
+    upsertMetadataManager.removeSegment(uploadedSegment3);
+    upsertMetadataManager.removeSegment(uploadedSegment4);
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
+  }
+
+  private void verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction 
hashFunction)
+      throws IOException {
+    ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes 
upsertMetadataManager =
+        new 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
 0,
+            _contextBuilder.setHashFunction(hashFunction).build());
+    Map<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation> 
recordLocationMap =
+        upsertMetadataManager._primaryKeyToRecordLocationMap;
+    Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+    // Add the first segment
+    int numRecords = 6;
+    int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+    int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+    boolean[] deleteFlags = new boolean[]{false, false, false, true, true, 
false};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+    ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, 
queryableDocIds1, primaryKeys1);
+    List<RecordInfo> recordInfoList1;
+    // get recordInfo from validDocIdSnapshot.
+    // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    int[] docIds1 = new int[]{2, 4, 5};
+    MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+    validDocIdsSnapshot1.add(docIds1);
+    recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys, 
timestamps, deleteFlags);
+    upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1, 
recordInfoList1.iterator());
+    trackedSegments.add(segment1);
+    // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4, 5});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 5});
+
+    // Add the second segment
+    numRecords = 5;
+    primaryKeys = new int[]{0, 1, 2, 3, 0};
+    timestamps = new int[]{100, 100, 120, 80, 80};
+    deleteFlags = new boolean[]{false, true, true, false, false};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    SegmentMetadataImpl segmentMetadata2 = mock(SegmentMetadataImpl.class);
+    
when(segmentMetadata2.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+    when(segmentMetadata2.getTotalDocs()).thenReturn(primaryKeys.length);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithSegmentMetadata(2, 
validDocIds2, queryableDocIds2,
+        getPrimaryKeyList(numRecords, primaryKeys), segmentMetadata2, null);
+    List<RecordInfo> recordInfoList2;
+    // get recordInfo from validDocIdSnapshot.
+    // segment2 snapshot: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // segment1 snapshot: 1 -> {4, 120}
+    MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+    validDocIdsSnapshot2.add(0, 2, 3);
+    recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys, 
timestamps, deleteFlags);
+    upsertMetadataManager.addSegment(segment2, validDocIds2, queryableDocIds2, 
recordInfoList2.iterator());
+    trackedSegments.add(segment2);
+
+    // segment1: 1 -> {4, 120}
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+    assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 3});
+
+    // Add an empty segment
+    EmptyIndexSegment emptySegment = mockEmptySegment(3);
+    upsertMetadataManager.addSegment(emptySegment);
+    // segment1: 1 -> {4, 120}
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+    assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 3});
+
+    // Replace (reload) the first segment
+    ThreadSafeMutableRoaringBitmap newValidDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap newQueryableDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl newSegment1 = mockImmutableSegment(1, 
newValidDocIds1, newQueryableDocIds1, primaryKeys1);
+    upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, 
newQueryableDocIds1, recordInfoList1.iterator(),
+        segment1);
+    trackedSegments.add(newSegment1);
+    trackedSegments.remove(segment1);
+
+    // original segment1: 1 -> {4, 120} (not in the map)
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 3});
+    assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+    // Remove the original segment1
+    upsertMetadataManager.removeSegment(segment1);
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 3});
+    assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+    // Remove the empty segment
+    upsertMetadataManager.removeSegment(emptySegment);
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 3});
+    assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+    // Remove segment2
+    upsertMetadataManager.removeSegment(segment2);
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map)
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 1);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(trackedSegments, Collections.singleton(newSegment1));
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 3});
+    assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Remove new segment1, should be no-op
+    upsertMetadataManager.removeSegment(newSegment1);
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 1);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(trackedSegments, Collections.singleton(newSegment1));
+    assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
+  }
+
+  private List<RecordInfo> getRecordInfoList(int numRecords, int[] 
primaryKeys, int[] timestamps,
+      @Nullable boolean[] deleteRecordFlags) {
+    List<RecordInfo> recordInfoList = new ArrayList<>();
+    for (int i = 0; i < numRecords; i++) {
+      recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, 
timestamps[i],
+          deleteRecordFlags != null && deleteRecordFlags[i]));
+    }
+    return recordInfoList;
+  }
+
+  private List<RecordInfo> getRecordInfoListForTTL(int numRecords, int[] 
primaryKeys, int[] timestamps,
+      @Nullable boolean[] deleteRecordFlags) {
+    List<RecordInfo> recordInfoList = new ArrayList<>();
+    for (int i = 0; i < numRecords; i++) {
+      recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, 
timestamps[i],
+          deleteRecordFlags != null && deleteRecordFlags[i]));
+    }
+    return recordInfoList;
+  }
+
+  /**
+   * Get recordInfo from validDocIdsSnapshot (enabledSnapshot = True).
+   */
+  private List<RecordInfo> getRecordInfoList(MutableRoaringBitmap 
validDocIdsSnapshot, int[] primaryKeys,
+      int[] timestamps, @Nullable boolean[] deleteRecordFlags) {
+    List<RecordInfo> recordInfoList = new ArrayList<>();
+    Iterator<java.lang.Integer> validDocIdsIterator = 
validDocIdsSnapshot.iterator();
+    validDocIdsIterator.forEachRemaining((docId) -> recordInfoList.add(
+        new RecordInfo(makePrimaryKey(primaryKeys[docId]), docId, 
timestamps[docId],
+            deleteRecordFlags != null && deleteRecordFlags[docId])));
+    return recordInfoList;
+  }
+
+  private List<PrimaryKey> getPrimaryKeyList(int numRecords, int[] 
primaryKeys) {
+    List<PrimaryKey> primaryKeyList = new ArrayList<>();
+    for (int i = 0; i < numRecords; i++) {
+      primaryKeyList.add(makePrimaryKey(primaryKeys[i]));
+    }
+    return primaryKeyList;
+  }
+
+  @Test
+  public void testAddRecord()
+      throws IOException {
+    verifyAddRecord(HashFunction.NONE);
+    verifyAddRecord(HashFunction.MD5);
+    verifyAddRecord(HashFunction.MURMUR3);
+  }
+
+  private void verifyAddRecord(HashFunction hashFunction)
+      throws IOException {
+    ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes 
upsertMetadataManager =
+        new 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
 0,
+            _contextBuilder.setHashFunction(hashFunction).build());
+    Map<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation> 
recordLocationMap =
+        upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    // Add the first segment
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+    int numRecords = 3;
+    int[] primaryKeys = new int[]{0, 1, 2};
+    int[] timestamps = new int[]{100, 120, 100};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegment(1, validDocIds1, null, 
getPrimaryKeyList(numRecords, primaryKeys));
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+        getRecordInfoList(numRecords, primaryKeys, timestamps, 
null).iterator());
+
+    // Update records from the second segment
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    MutableSegment segment2 = mockMutableSegment(1, validDocIds2, null);
+    upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(3), 0, 100, false));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+    // segment2: 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+
+    upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(2), 1, 120, false));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+    // segment2: 2 -> {1, 120}, 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+
+    upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(1), 2, 100, false));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+    // segment2: 2 -> {1, 120}, 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+
+    upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(0), 3, 100, false));
+
+    // segment1: 1 -> {1, 120}
+    // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 3});
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Add record should be no-op
+    upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(0), 4, 120, false));
+    // segment1: 1 -> {1, 120}
+    // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 3});
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testAddOutOfOrderRecord()
+      throws IOException {
+    verifyAddOutOfOrderRecord(HashFunction.NONE);
+    verifyAddOutOfOrderRecord(HashFunction.MD5);
+    verifyAddOutOfOrderRecord(HashFunction.MURMUR3);
+  }
+
+  private void verifyAddOutOfOrderRecord(HashFunction hashFunction)
+      throws IOException {
+    ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes 
upsertMetadataManager =
+        new 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
 0,
+            _contextBuilder.setHashFunction(hashFunction).build());
+    Map<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation> 
recordLocationMap =
+        upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    // Add the first segment
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+    int numRecords = 3;
+    int[] primaryKeys = new int[]{0, 1, 2};
+    int[] timestamps = new int[]{100, 120, 100};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegment(1, validDocIds1, null, 
getPrimaryKeyList(numRecords, primaryKeys));
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+        getRecordInfoList(numRecords, primaryKeys, timestamps, 
null).iterator());
+
+    // Update records from the second segment
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    MutableSegment segment2 = mockMutableSegment(1, validDocIds2, null);
+
+    // new record, should return false for out of order event
+    boolean isOutOfOrderRecord =
+        !upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(3), 0, 100, false));
+    assertFalse(isOutOfOrderRecord);
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+    // segment2: 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+
+    // send an out-of-order event, should return true for orderness of event
+    isOutOfOrderRecord = !upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(2), 1, 80, false));
+    assertTrue(isOutOfOrderRecord);
+
+    // ordered event for an existing key
+    isOutOfOrderRecord = !upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(2), 1, 150, false));
+    assertFalse(isOutOfOrderRecord);
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+    // segment2: 3 -> {0, 100}, 2 -> {1, 150}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+
+    // Close the metadata manager
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testAddRecordWithDeleteColumn()
+      throws IOException {
+    verifyAddRecordWithDeleteColumn(HashFunction.NONE);
+    verifyAddRecordWithDeleteColumn(HashFunction.MD5);
+    verifyAddRecordWithDeleteColumn(HashFunction.MURMUR3);
+  }
+
+  private void verifyAddRecordWithDeleteColumn(HashFunction hashFunction)
+      throws IOException {
+    ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes 
upsertMetadataManager =
+        new 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
 0,
+            _contextBuilder.setHashFunction(hashFunction).build());
+    Map<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation> 
recordLocationMap =
+        upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    // queryableDocIds is same as validDocIds in the absence of delete markers
+    // Add the first segment
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+    int numRecords = 3;
+    int[] primaryKeys = new int[]{0, 1, 2};
+    int[] timestamps = new int[]{100, 120, 100};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegment(1, validDocIds1, queryableDocIds1, 
getPrimaryKeyList(numRecords, primaryKeys));
+    upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1,
+        getRecordInfoList(numRecords, primaryKeys, timestamps, 
null).iterator());
+
+    // Update records from the second segment
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    MutableSegment segment2 = mockMutableSegment(1, validDocIds2, 
queryableDocIds2);
+    upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(3), 0, 100, false));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+    // segment2: 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+
+    // Mark a record with latest value in segment1 as deleted
+    upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(2), 1, 120, true));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+    // segment2: 2 -> {1, 120}, 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+
+    // Mark a record with latest value in segment2 as deleted
+    upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(3), 2, 150, true));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+    // segment2: 2 -> {1, 120}, 3 -> {2, 150}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 2, 150, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1, 2});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{});
+
+    // Revive a deleted primary key (by providing a larger comparisonValue)
+    upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(3), 3, 200, false));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+    // segment2: 2 -> {1, 120}, 3 -> {3, 200}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1, 3});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{3});
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Add record should be no-op
+    upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(0), 4, 120, false));
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+    // segment2: 2 -> {1, 120}, 3 -> {3, 200}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1, 3});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{3});
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testRemoveExpiredDeletedKeys()
+      throws IOException {
+    verifyRemoveExpiredDeletedKeys(HashFunction.NONE);
+    verifyRemoveExpiredDeletedKeys(HashFunction.MD5);
+    verifyRemoveExpiredDeletedKeys(HashFunction.MURMUR3);
+  }
+
+  private void verifyRemoveExpiredDeletedKeys(HashFunction hashFunction)
+      throws IOException {
+    ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes 
upsertMetadataManager =
+        new 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
 0,
+            _contextBuilder.setHashFunction(hashFunction).build());
+    Map<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation> 
recordLocationMap =
+        upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    // Add the first segment
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+    int numRecords = 3;
+    int[] primaryKeys = new int[]{0, 1, 2};
+    int[] timestamps = new int[]{100, 120, 100};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegment(1, validDocIds1, queryableDocIds1, 
getPrimaryKeyList(numRecords, primaryKeys));
+    upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1,
+        getRecordInfoListForTTL(numRecords, primaryKeys, timestamps, 
null).iterator());
+
+    // Update records from the second segment
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    MutableSegment segment2 = mockMutableSegment(1, validDocIds2, 
queryableDocIds2);
+    upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(3), 0, 100, false));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+    // segment2: 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+
+    // Mark a record with latest value in segment1 as deleted (within 
TTL-window)
+    upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(2), 1, 150, true));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+    // segment2: 2 -> {1, 120}, 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+
+    // Mark a record with latest value in segment2 as deleted (outside TTL 
window)
+    upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(3), 2, 120, true));
+    // Mark a record with latest value in segment2 as deleted (outside TTL 
window)
+    upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(1), 3, 120, true));
+
+    // now we have 3 records marked as deleted, one is within TTL window, rest 
2 are outside TTL window
+    // For the other 2, one key has data in one previous segment as well and 
one has all data for the keys
+    // in the same segment.
+    // segment1: 0 -> {0, 100}
+    // segment2: 2 -> {1, 120}, 3 -> {2, 150}, 1 -> {1, 120}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment2, 3, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 2, 120, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1, 2, 3});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{});
+
+    // call delete-key workflow
+    // value 2 will be there as it is within TTL window
+    // value 1 will also be there as it is outside TTL window but data exists 
in one more segment
+    // value 3 will be removed
+    upsertMetadataManager.removeExpiredPrimaryKeys();
+    // segment1: 0 -> {0, 100}
+    // segment2: 2 -> {1, 120}, 1 -> {1, 120}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment2, 3, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1, 3});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{});
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
+  }
+
+  public void verifyGetQueryableDocIds(boolean isDeleteColumnNull, boolean[] 
deleteFlags,
+      MutableRoaringBitmap validDocIdsSnapshot, MutableRoaringBitmap 
queryableDocIds) {
+    ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes 
upsertMetadataManager =
+        new 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
 0,
+            _contextBuilder.build());
+
+    try (MockedConstruction<PinotSegmentColumnReader> deleteColReader = 
mockConstruction(PinotSegmentColumnReader.class,
+        (mockReader, context) -> {
+          for (int i = 0; i < deleteFlags.length; i++) {
+            when(mockReader.isNull(i)).thenReturn(isDeleteColumnNull);
+            when(mockReader.getValue(i)).thenReturn(deleteFlags[i]);
+          }
+        })) {
+
+      SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+      ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
+      when(segmentMetadata.getTotalDocs()).thenReturn(deleteFlags.length);
+      when(segmentMetadata.getColumnMetadataMap()).thenReturn(new TreeMap() {{
+        this.put(COMPARISON_COLUMNS.get(0), columnMetadata);
+      }});
+
+      ImmutableSegmentImpl segment =
+          mockImmutableSegmentWithSegmentMetadata(1, new 
ThreadSafeMutableRoaringBitmap(), null, null, segmentMetadata,
+              validDocIdsSnapshot);
+      assertEquals(upsertMetadataManager.getQueryableDocIds(segment, 
validDocIdsSnapshot), queryableDocIds);
+    }
+  }
+
+  @Test
+  public void testHashPrimaryKey() {
+    PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"});
+    assertEquals(BytesUtils.toHexString(((ByteArray) 
HashUtils.hashPrimaryKey(pk, HashFunction.MD5)).getBytes()),
+        "6ca926be8c2d1d980acf48ba48418e24");
+    assertEquals(BytesUtils.toHexString(((ByteArray) 
HashUtils.hashPrimaryKey(pk, HashFunction.MURMUR3)).getBytes()),
+        "e4540494e43b27e312d01f33208c6a4e");
+    // reorder
+    pk = new PrimaryKey(new Object[]{"uuid-3", "uuid-2", "uuid-1"});
+    assertEquals(BytesUtils.toHexString(((ByteArray) 
HashUtils.hashPrimaryKey(pk, HashFunction.MD5)).getBytes()),
+        "fc2159b78d07f803fdfb0b727315a445");
+    assertEquals(BytesUtils.toHexString(((ByteArray) 
HashUtils.hashPrimaryKey(pk, HashFunction.MURMUR3)).getBytes()),
+        "37fab5ef0ea39711feabcdc623cb8a4e");
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java
new file mode 100644
index 0000000000..daf10826c9
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class TableUpsertMetadataManagerFactoryTest {
+  private static final String RAW_TABLE_NAME = "testTable";
+  private TableConfig _tableConfig;
+
+  @Test
+  public void testCreateForDefaultManagerClass() {
+    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setHashFunction(HashFunction.NONE);
+    _tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setUpsertConfig(upsertConfig).build();
+    TableUpsertMetadataManager tableUpsertMetadataManager =
+        TableUpsertMetadataManagerFactory.create(_tableConfig, null);
+    assertNotNull(tableUpsertMetadataManager);
+    assertTrue(tableUpsertMetadataManager instanceof 
ConcurrentMapTableUpsertMetadataManager);
+  }
+
+  @Test
+  public void testCreateForManagerClassWithConsistentDeletes() {
+    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setHashFunction(HashFunction.NONE);
+    upsertConfig.setEnableDeletedKeysCompactionConsistency(true);
+    _tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setUpsertConfig(upsertConfig).build();
+    TableUpsertMetadataManager tableUpsertMetadataManager =
+        TableUpsertMetadataManagerFactory.create(_tableConfig, null);
+    assertNotNull(tableUpsertMetadataManager);
+    assertTrue(tableUpsertMetadataManager instanceof 
BaseTableUpsertMetadataManager);
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 8062b422f0..e35c671705 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1961,6 +1961,83 @@ public class TableConfigUtilsTest {
     } catch (IllegalStateException e) {
       Assert.assertEquals(e.getMessage(), "The outOfOrderRecordColumn must be 
a single-valued BOOLEAN column");
     }
+
+    // test enableDeletedKeysCompactionConsistency shouldn't exist with 
metadataTTL
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setEnableDeletedKeysCompactionConsistency(true);
+    upsertConfig.setMetadataTTL(1.0);
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+        .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(),
+          "enableDeletedKeysCompactionConsistency and metadataTTL shouldn't 
exist together for upsert table");
+    }
+
+    // test enableDeletedKeysCompactionConsistency shouldn't exist with 
enablePreload
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setEnableDeletedKeysCompactionConsistency(true);
+    upsertConfig.setEnablePreload(true);
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+        .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(),
+          "enableDeletedKeysCompactionConsistency and enablePreload shouldn't 
exist together for upsert table");
+    }
+
+    // test enableDeletedKeysCompactionConsistency should exist with 
deletedKeysTTL
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setEnableDeletedKeysCompactionConsistency(true);
+    upsertConfig.setDeletedKeysTTL(0);
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+        .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(),
+          "enableDeletedKeysCompactionConsistency should exist with 
deletedKeysTTL for upsert table");
+    }
+
+    // test enableDeletedKeysCompactionConsistency should exist with 
enableSnapshot
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setEnableDeletedKeysCompactionConsistency(true);
+    upsertConfig.setDeletedKeysTTL(100);
+    upsertConfig.setEnableSnapshot(false);
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+        .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(),
+          "enableDeletedKeysCompactionConsistency should exist with 
enableSnapshot for upsert table");
+    }
+
+    // test enableDeletedKeysCompactionConsistency should exist with 
UpsertCompactionTask
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setEnableDeletedKeysCompactionConsistency(true);
+    upsertConfig.setDeletedKeysTTL(100);
+    upsertConfig.setEnableSnapshot(true);
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+        .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(),
+          "enableDeletedKeysCompactionConsistency should exist with 
UpsertCompactionTask for upsert table");
+    }
   }
 
   @Test
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 51588c273c..3b453aca92 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -76,6 +76,9 @@ public class UpsertConfig extends BaseJsonConfig {
   @JsonPropertyDescription("TTL for upsert metadata cleanup for deleted keys, 
it uses the same unit as comparison col")
   private double _deletedKeysTTL;
 
+  @JsonPropertyDescription("If we are using deletionKeysTTL + compaction we 
need to enable this for data consistency")
+  private boolean _enableDeletedKeysCompactionConsistency;
+
   @JsonPropertyDescription("Whether to preload segments for fast upsert 
metadata recovery")
   private boolean _enablePreload;
 
@@ -160,6 +163,10 @@ public class UpsertConfig extends BaseJsonConfig {
     return _enablePreload;
   }
 
+  public boolean isEnableDeletedKeysCompactionConsistency() {
+    return _enableDeletedKeysCompactionConsistency;
+  }
+
   public ConsistencyMode getConsistencyMode() {
     return _consistencyMode;
   }
@@ -270,6 +277,10 @@ public class UpsertConfig extends BaseJsonConfig {
     _dropOutOfOrderRecord = dropOutOfOrderRecord;
   }
 
+  public void setEnableDeletedKeysCompactionConsistency(boolean 
enableDeletedKeysCompactionConsistency) {
+    _enableDeletedKeysCompactionConsistency = 
enableDeletedKeysCompactionConsistency;
+  }
+
   public void setMetadataManagerClass(String metadataManagerClass) {
     _metadataManagerClass = metadataManagerClass;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to