This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 983d3563ef add metadata ttl field to DedupConfig (#13636)
983d3563ef is described below

commit 983d3563ef9717648a24cb31e9bc2545054226c3
Author: Haitao Zhang <[email protected]>
AuthorDate: Fri Aug 16 12:09:05 2024 -0700

    add metadata ttl field to DedupConfig (#13636)
    
    * add metadata ttl field to DedupConfig
    
    * implement dedup metadata ttl handling logic in 
ConcurrentMapPartitionDedupMetadataManager
---
 .../apache/pinot/common/metrics/ServerTimer.java   |   5 +-
 .../common/utils/config/TableConfigSerDeTest.java  |  30 ++-
 .../realtime/RealtimeSegmentDataManager.java       |   9 +-
 .../manager/realtime/RealtimeTableDataManager.java |  31 ++-
 .../dedup/BasePartitionDedupMetadataManager.java   | 246 +++++++++++++++++
 .../local/dedup/BaseTableDedupMetadataManager.java |  61 ++++-
 ...ConcurrentMapPartitionDedupMetadataManager.java | 171 ++++++------
 .../ConcurrentMapTableDedupMetadataManager.java    |   5 +-
 .../pinot/segment/local/dedup/DedupContext.java    | 159 +++++++++++
 ...upMetadataManager.java => DedupRecordInfo.java} |  25 +-
 .../pinot/segment/local/dedup/DedupUtils.java      |  95 +++++++
 .../local/dedup/PartitionDedupMetadataManager.java |  44 +++-
 .../local/dedup/TableDedupMetadataManager.java     |   8 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   |  21 +-
 .../local/realtime/impl/RealtimeSegmentConfig.java |  16 +-
 .../local/segment/readers/PrimaryKeyReader.java    |  69 +++++
 .../upsert/BasePartitionUpsertMetadataManager.java |   4 +-
 ...nUpsertMetadataManagerForConsistentDeletes.java |   3 +-
 .../pinot/segment/local/upsert/UpsertUtils.java    |  51 +---
 ...apPartitionDedupMetadataManagerWithTTLTest.java | 291 +++++++++++++++++++++
 ...artitionDedupMetadataManagerWithoutTTLTest.java | 158 +++++++++++
 .../pinot/segment/local/dedup/DedupTestUtils.java  |  57 ++++
 .../dedup/PartitionDedupMetadataManagerTest.java   | 171 ------------
 .../mutable/MutableSegmentDedupeTest.java          |  93 ++++++-
 .../mutable/MutableSegmentImplTestUtils.java       |  17 +-
 .../MutableSegmentImplUpsertComparisonColTest.java |   2 +-
 .../mutable/MutableSegmentImplUpsertTest.java      |   2 +-
 .../src/test/resources/data/test_dedup_data.json   |  12 +-
 .../src/test/resources/data/test_dedup_schema.json |   4 +
 .../apache/pinot/spi/config/table/DedupConfig.java |  40 ++-
 .../apache/pinot/spi/config/table/TableConfig.java |  10 +
 31 files changed, 1552 insertions(+), 358 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index 79e1eff8e0..b3e5e70641 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -52,10 +52,13 @@ public enum ServerTimer implements AbstractMetrics.Timer {
   UPSERT_PRELOAD_TIME_MS("milliseconds", false,
       "Total time taken to preload a table partition of an upsert table with 
upsert snapshot"),
   UPSERT_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false,
-      "Total time taken to delete expired primary keys based on metadataTTL or 
deletedKeysTTL"),
+      "Total time taken to delete expired upsert primary keys based on 
metadataTTL or deletedKeysTTL"),
   GRPC_QUERY_EXECUTION_MS("milliseconds", false, "Total execution time of a 
successful query over gRPC"),
   UPSERT_SNAPSHOT_TIME_MS("milliseconds", false, "Total time taken to take 
upsert table snapshot"),
 
+  DEDUP_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false,
+      "Total time taken to delete expired dedup primary keys based on 
metadataTTL or deletedKeysTTL"),
+
   // Multi-stage
   /**
    * Time spent building the hash table for the join.
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index dc9235d793..47de3d6225 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -259,12 +259,22 @@ public class TableConfigSerDeTest {
       
checkTableConfigWithUpsertConfig(TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
     }
     {
-      // with dedup config
+      // with dedup config - without metadata ttl and metadata time column
       DedupConfig dedupConfig = new DedupConfig(true, HashFunction.MD5);
       TableConfig tableConfig = 
tableConfigBuilder.setDedupConfig(dedupConfig).build();
       // Serialize then de-serialize
-      
checkTableConfigWithDedupConfig(JsonUtils.stringToObject(tableConfig.toJsonString(),
 TableConfig.class));
-      
checkTableConfigWithDedupConfig(TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
+      checkTableConfigWithDedupConfigWithoutTTL(
+          JsonUtils.stringToObject(tableConfig.toJsonString(), 
TableConfig.class));
+      checkTableConfigWithDedupConfigWithoutTTL(
+          
TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
+    }
+    {
+      // with dedup config - with metadata ttl and metadata time column
+      DedupConfig dedupConfig = new DedupConfig(true, HashFunction.MD5, null, 
null, 10, "dedupTimeColumn");
+      TableConfig tableConfig = 
tableConfigBuilder.setDedupConfig(dedupConfig).build();
+      // Serialize then de-serialize
+      
checkTableConfigWithDedupConfigWithTTL(JsonUtils.stringToObject(tableConfig.toJsonString(),
 TableConfig.class));
+      
checkTableConfigWithDedupConfigWithTTL(TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
     }
     {
       // with SegmentsValidationAndRetentionConfig
@@ -556,11 +566,23 @@ public class TableConfigSerDeTest {
     assertEquals(upsertConfig.getMode(), UpsertConfig.Mode.FULL);
   }
 
-  private void checkTableConfigWithDedupConfig(TableConfig tableConfig) {
+  private void checkTableConfigWithDedupConfigWithoutTTL(TableConfig 
tableConfig) {
+    DedupConfig dedupConfig = tableConfig.getDedupConfig();
+    assertNotNull(dedupConfig);
+
+    assertTrue(dedupConfig.isDedupEnabled());
+    assertEquals(dedupConfig.getHashFunction(), HashFunction.MD5);
+    assertEquals(dedupConfig.getMetadataTTL(), 0);
+    assertNull(dedupConfig.getDedupTimeColumn());
+  }
+
+  private void checkTableConfigWithDedupConfigWithTTL(TableConfig tableConfig) 
{
     DedupConfig dedupConfig = tableConfig.getDedupConfig();
     assertNotNull(dedupConfig);
 
     assertTrue(dedupConfig.isDedupEnabled());
     assertEquals(dedupConfig.getHashFunction(), HashFunction.MD5);
+    assertEquals(dedupConfig.getMetadataTTL(), 10);
+    assertEquals(dedupConfig.getDedupTimeColumn(), "dedupTimeColumn");
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 73915c5a2e..b04bf4a36e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -250,6 +250,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private final AtomicBoolean _acquiredConsumerSemaphore;
   private final ServerMetrics _serverMetrics;
   private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+  private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
   private final BooleanSupplier _isReadyToConsumeData;
   private final MutableSegmentImpl _realtimeSegment;
   private volatile StreamPartitionMsgOffset _currentOffset; // Next offset to 
be consumed
@@ -722,6 +723,10 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           }
         }
 
+        if (_partitionDedupMetadataManager != null && 
_tableConfig.getDedupMetadataTTL() > 0) {
+          _partitionDedupMetadataManager.removeExpiredPrimaryKeys();
+        }
+
         while (!_state.isFinal()) {
           if (_state.shouldConsume()) {
             consumeLoop();  // Consume until we reached the end criteria, or 
we are stopped.
@@ -1441,6 +1446,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     _schema = schema;
     _serverMetrics = serverMetrics;
     _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
+    _partitionDedupMetadataManager = partitionDedupMetadataManager;
     _isReadyToConsumeData = isReadyToConsumeData;
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
     _instanceId = _realtimeTableDataManager.getInstanceId();
@@ -1560,11 +1566,12 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
             .setNullHandlingEnabled(_nullHandlingEnabled)
             
.setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode())
             .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
-            .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
             
.setUpsertComparisonColumns(tableConfig.getUpsertComparisonColumns())
             
.setUpsertDeleteRecordColumn(tableConfig.getUpsertDeleteRecordColumn())
             
.setUpsertOutOfOrderRecordColumn(tableConfig.getOutOfOrderRecordColumn())
             
.setUpsertDropOutOfOrderRecord(tableConfig.isDropOutOfOrderRecord())
+            .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
+            .setDedupTimeColumn(tableConfig.getDedupTimeColumn())
             .setFieldConfigList(tableConfig.getFieldConfigList());
 
     // Create message decoder
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 8393da3884..8a99610c75 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -248,12 +248,22 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     // Make sure we do metric cleanup when we shut down the table.
     // Do this first, so we do not show ingestion lag during shutdown.
     _ingestionDelayTracker.shutdown();
-    if (_tableUpsertMetadataManager != null) {
+    if (_tableUpsertMetadataManager != null || _tableDedupMetadataManager != 
null) {
       // Stop the upsert metadata manager first to prevent removing metadata 
when destroying segments
-      _tableUpsertMetadataManager.stop();
+      if (_tableUpsertMetadataManager != null) {
+        _tableUpsertMetadataManager.stop();
+      }
+      if (_tableDedupMetadataManager != null) {
+        _tableDedupMetadataManager.stop();
+      }
       releaseAndRemoveAllSegments();
       try {
-        _tableUpsertMetadataManager.close();
+        if (_tableUpsertMetadataManager != null) {
+          _tableUpsertMetadataManager.close();
+        }
+        if (_tableDedupMetadataManager != null) {
+          _tableDedupMetadataManager.close();
+        }
       } catch (IOException e) {
         _logger.warn("Caught exception while closing upsert metadata manager", 
e);
       }
@@ -545,14 +555,13 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       return;
     }
 
-    // TODO: Change dedup handling to handle segment replacement
     if (isDedupEnabled() && immutableSegment instanceof ImmutableSegmentImpl) {
-      buildDedupMeta((ImmutableSegmentImpl) immutableSegment);
+      handleDedup((ImmutableSegmentImpl) immutableSegment);
     }
     super.addSegment(immutableSegment);
   }
 
-  private void buildDedupMeta(ImmutableSegmentImpl immutableSegment) {
+  private void handleDedup(ImmutableSegmentImpl immutableSegment) {
     // TODO(saurabh) refactor commons code with handleUpsert
     String segmentName = immutableSegment.getSegmentName();
     Integer partitionGroupId =
@@ -563,7 +572,15 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     PartitionDedupMetadataManager partitionDedupMetadataManager =
         
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId);
     immutableSegment.enableDedup(partitionDedupMetadataManager);
-    partitionDedupMetadataManager.addSegment(immutableSegment);
+    SegmentDataManager oldSegmentManager = 
_segmentDataManagerMap.get(segmentName);
+    if (oldSegmentManager != null) {
+      LOGGER.info("Replacing mutable segment: {} with immutable segment: {} in 
partition dedup metadata manager",
+          oldSegmentManager.getSegment().getSegmentName(), segmentName);
+      
partitionDedupMetadataManager.replaceSegment(oldSegmentManager.getSegment(), 
immutableSegment);
+    } else {
+      LOGGER.info("Adding immutable segment: {} to partition dedup metadata 
manager", segmentName);
+      partitionDedupMetadataManager.addSegment(immutableSegment);
+    }
   }
 
   private void handleUpsert(ImmutableSegment immutableSegment) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
new file mode 100644
index 0000000000..b711fdce44
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerTimer;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class BasePartitionDedupMetadataManager implements 
PartitionDedupMetadataManager {
+  protected final String _tableNameWithType;
+  protected final List<String> _primaryKeyColumns;
+  protected final int _partitionId;
+  protected final ServerMetrics _serverMetrics;
+  protected final HashFunction _hashFunction;
+  protected final double _metadataTTL;
+  protected final String _dedupTimeColumn;
+  protected final Logger _logger;
+
+  // The following variables are always accessed within synchronized block
+  private boolean _stopped;
+  // Initialize with 1 pending operation to indicate the metadata manager can 
take more operations
+  private int _numPendingOperations = 1;
+  private boolean _closed;
+
+  protected BasePartitionDedupMetadataManager(String tableNameWithType, int 
partitionId, DedupContext dedupContext) {
+    _tableNameWithType = tableNameWithType;
+    _partitionId = partitionId;
+    _primaryKeyColumns = dedupContext.getPrimaryKeyColumns();
+    _hashFunction = dedupContext.getHashFunction();
+    _serverMetrics = dedupContext.getServerMetrics();
+    _metadataTTL = dedupContext.getMetadataTTL() >= 0 ? 
dedupContext.getMetadataTTL() : 0;
+    _dedupTimeColumn = dedupContext.getDedupTimeColumn();
+    if (_metadataTTL > 0) {
+      Preconditions.checkArgument(_dedupTimeColumn != null,
+          "When metadataTTL is configured, metadata time column must be 
configured for dedup enabled table: %s",
+          tableNameWithType);
+    }
+    _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + 
"-" + getClass().getSimpleName());
+  }
+
+  @Override
+  public boolean checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment 
indexSegment) {
+    throw new UnsupportedOperationException(
+        "checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment indexSegment) 
is " + "deprecated!");
+  }
+
+  @Override
+  public void addSegment(IndexSegment segment) {
+    if (!startOperation()) {
+      _logger.info("Skip adding segment: {} because dedup metadata manager is 
already stopped",
+          segment.getSegmentName());
+      return;
+    }
+    try {
+      addOrReplaceSegment(null, segment);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Caught exception while adding segment: %s of table: 
%s to %s", segment.getSegmentName(),
+              _tableNameWithType, this.getClass().getSimpleName()), e);
+    } finally {
+      finishOperation();
+    }
+  }
+
+  @Override
+  public void replaceSegment(IndexSegment oldSegment, IndexSegment newSegment) 
{
+    if (!startOperation()) {
+      _logger.info("Skip replacing segment: {} with segment: {} because dedup 
metadata manager is already stopped",
+          oldSegment.getSegmentName(), newSegment.getSegmentName());
+      return;
+    }
+    try {
+      addOrReplaceSegment(oldSegment, newSegment);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Caught exception while replacing segment: %s with 
segment: %s of table: %s in %s",
+              oldSegment.getSegmentName(), newSegment.getSegmentName(), 
_tableNameWithType,
+              this.getClass().getSimpleName()), e);
+    } finally {
+      finishOperation();
+    }
+  }
+
+  private void addOrReplaceSegment(@Nullable IndexSegment oldSegment, 
IndexSegment newSegment)
+      throws IOException {
+    try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new 
DedupUtils.DedupRecordInfoReader(newSegment,
+        _primaryKeyColumns, _dedupTimeColumn)) {
+      Iterator<DedupRecordInfo> dedupRecordInfoIterator =
+          DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 
newSegment.getSegmentMetadata().getTotalDocs());
+      doAddOrReplaceSegment(oldSegment, newSegment, dedupRecordInfoIterator);
+      updatePrimaryKeyGauge();
+    }
+  }
+
+  /**
+   * Adds the dedup metadata for the new segment if old segment is null; or 
replaces the dedup metadata for the given
+   * old segment with the new segment if the old segment is not null.
+   * @param oldSegment The old segment to replace. If null, add the new 
segment.
+   * @param newSegment The new segment to add or replace.
+   * @param dedupRecordInfoIteratorOfNewSegment The iterator of dedup record 
info of the new segment.
+   */
+  protected abstract void doAddOrReplaceSegment(@Nullable IndexSegment 
oldSegment, IndexSegment newSegment,
+      Iterator<DedupRecordInfo> dedupRecordInfoIteratorOfNewSegment);
+
+  @Override
+  public void removeSegment(IndexSegment segment) {
+    if (!startOperation()) {
+      _logger.info("Skip removing segment: {} because metadata manager is 
already stopped", segment.getSegmentName());
+      return;
+    }
+    try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new 
DedupUtils.DedupRecordInfoReader(segment,
+        _primaryKeyColumns, _dedupTimeColumn)) {
+      Iterator<DedupRecordInfo> dedupRecordInfoIterator =
+          DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 
segment.getSegmentMetadata().getTotalDocs());
+      doRemoveSegment(segment, dedupRecordInfoIterator);
+      updatePrimaryKeyGauge();
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Caught exception while removing segment: %s of table: 
%s from %s", segment.getSegmentName(),
+              _tableNameWithType, this.getClass().getSimpleName()), e);
+    } finally {
+      finishOperation();
+    }
+  }
+
+  protected abstract void doRemoveSegment(IndexSegment segment, 
Iterator<DedupRecordInfo> dedupRecordInfoIterator);
+
+  @Override
+  public void removeExpiredPrimaryKeys() {
+    if (!startOperation()) {
+      _logger.info("Skip removing expired primary keys because metadata 
manager is already stopped");
+      return;
+    }
+    try {
+      long startTime = System.currentTimeMillis();
+      doRemoveExpiredPrimaryKeys();
+      long duration = System.currentTimeMillis() - startTime;
+      _serverMetrics.addTimedTableValue(_tableNameWithType, 
ServerTimer.DEDUP_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS,
+          duration, TimeUnit.MILLISECONDS);
+    } finally {
+      finishOperation();
+    }
+  }
+
+  /**
+   * Removes all primary keys that have dedup time smaller than 
(largestSeenDedupTime - TTL).
+   */
+  protected abstract void doRemoveExpiredPrimaryKeys();
+
+  protected synchronized boolean startOperation() {
+    if (_stopped || _numPendingOperations == 0) {
+      return false;
+    }
+    _numPendingOperations++;
+    return true;
+  }
+
+  protected synchronized void finishOperation() {
+    _numPendingOperations--;
+    if (_numPendingOperations == 0) {
+      notifyAll();
+    }
+  }
+
+  @Override
+  public synchronized void stop() {
+    if (_stopped) {
+      _logger.warn("Metadata manager is already stopped");
+      return;
+    }
+    _stopped = true;
+    _numPendingOperations--;
+    _logger.info("Stopped the metadata manager with {} pending operations, 
current primary key count: {}",
+        _numPendingOperations, getNumPrimaryKeys());
+  }
+
+  @Override
+  public synchronized void close()
+      throws IOException {
+    Preconditions.checkState(_stopped, "Must stop the metadata manager before 
closing it");
+    if (_closed) {
+      _logger.warn("Metadata manager is already closed");
+      return;
+    }
+    _closed = true;
+    _logger.info("Closing the metadata manager");
+    while (_numPendingOperations != 0) {
+      _logger.info("Waiting for {} pending operations to finish", 
_numPendingOperations);
+      try {
+        wait();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(
+            String.format("Interrupted while waiting for %d pending operations 
to finish", _numPendingOperations), e);
+      }
+    }
+    doClose();
+    // We don't remove the segment from the metadata manager when
+    // it's closed. This was done to make table deletion faster. Since we 
don't remove the segment, we never decrease
+    // the primary key count. So, we set the primary key count to 0 here.
+    updatePrimaryKeyGauge(0);
+    _logger.info("Closed the metadata manager");
+  }
+
+  protected abstract long getNumPrimaryKeys();
+
+  protected void updatePrimaryKeyGauge(long numPrimaryKeys) {
+    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
+        numPrimaryKeys);
+  }
+
+  protected void updatePrimaryKeyGauge() {
+    updatePrimaryKeyGauge(getNumPrimaryKeys());
+  }
+
+  protected void doClose()
+      throws IOException {
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
index 8e1bac69d1..80639ebd5e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.dedup;
 
 import com.google.common.base.Preconditions;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -26,32 +27,51 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.spi.config.table.DedupConfig;
-import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 
 
-abstract class BaseTableDedupMetadataManager implements 
TableDedupMetadataManager {
+public abstract class BaseTableDedupMetadataManager implements 
TableDedupMetadataManager {
   protected final Map<Integer, PartitionDedupMetadataManager> 
_partitionMetadataManagerMap = new ConcurrentHashMap<>();
   protected String _tableNameWithType;
-  protected List<String> _primaryKeyColumns;
-  protected ServerMetrics _serverMetrics;
-  protected HashFunction _hashFunction;
+  protected DedupContext _dedupContext;
 
   @Override
   public void init(TableConfig tableConfig, Schema schema, TableDataManager 
tableDataManager,
       ServerMetrics serverMetrics) {
     _tableNameWithType = tableConfig.getTableName();
 
-    _primaryKeyColumns = schema.getPrimaryKeyColumns();
-    Preconditions.checkArgument(!CollectionUtils.isEmpty(_primaryKeyColumns),
+    List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
+    Preconditions.checkArgument(!CollectionUtils.isEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dedup enabled table: %s", 
_tableNameWithType);
 
-    _serverMetrics = serverMetrics;
-
     DedupConfig dedupConfig = tableConfig.getDedupConfig();
     Preconditions.checkArgument(dedupConfig != null, "Dedup must be enabled 
for table: %s", _tableNameWithType);
-    _hashFunction = dedupConfig.getHashFunction();
+    double metadataTTL = dedupConfig.getMetadataTTL();
+    String dedupTimeColumn = dedupConfig.getDedupTimeColumn();
+    if (dedupTimeColumn == null) {
+      dedupTimeColumn = tableConfig.getValidationConfig().getTimeColumnName();
+    }
+    if (metadataTTL > 0) {
+      Preconditions.checkArgument(dedupTimeColumn != null,
+          "When metadataTTL is configured, metadata time column or time column 
must be configured for "
+              + "dedup enabled table: %s", _tableNameWithType);
+    }
+
+    DedupContext.Builder dedupContextBuider = new DedupContext.Builder();
+    dedupContextBuider
+        .setTableConfig(tableConfig)
+        .setSchema(schema)
+        .setPrimaryKeyColumns(primaryKeyColumns)
+        .setHashFunction(dedupConfig.getHashFunction())
+        .setMetadataTTL(metadataTTL)
+        .setDedupTimeColumn(dedupTimeColumn)
+        .setTableIndexDir(tableDataManager.getTableDataDir())
+        .setTableDataManager(tableDataManager)
+        .setServerMetrics(serverMetrics);
+    _dedupContext = dedupContextBuider.build();
+
+    initCustomVariables();
   }
 
   public PartitionDedupMetadataManager getOrCreatePartitionManager(int 
partitionId) {
@@ -62,4 +82,25 @@ abstract class BaseTableDedupMetadataManager implements 
TableDedupMetadataManage
    * Create PartitionDedupMetadataManager for given partition id.
    */
   abstract protected PartitionDedupMetadataManager 
createPartitionDedupMetadataManager(Integer partitionId);
+
+  /**
+   * Can be overridden to initialize custom variables after other variables 
are set
+   */
+  protected void initCustomVariables() {
+  }
+
+  @Override
+  public void stop() {
+    for (PartitionDedupMetadataManager metadataManager : 
_partitionMetadataManagerMap.values()) {
+      metadataManager.stop();
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    for (PartitionDedupMetadataManager metadataManager : 
_partitionMetadataManagerMap.values()) {
+      metadataManager.close();
+    }
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
index a8925e2200..033611160c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
@@ -19,106 +19,121 @@
 package org.apache.pinot.segment.local.dedup;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.HashMap;
+import com.google.common.util.concurrent.AtomicDouble;
+import java.io.IOException;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.metrics.ServerGauge;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import org.apache.pinot.segment.local.utils.HashUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.spi.config.table.HashFunction;
-import org.apache.pinot.spi.data.readers.PrimaryKey;
-import org.apache.pinot.spi.utils.ByteArray;
 
-class ConcurrentMapPartitionDedupMetadataManager implements 
PartitionDedupMetadataManager {
-  private final String _tableNameWithType;
-  private final List<String> _primaryKeyColumns;
-  private final int _partitionId;
-  private final ServerMetrics _serverMetrics;
-  private final HashFunction _hashFunction;
 
+class ConcurrentMapPartitionDedupMetadataManager extends 
BasePartitionDedupMetadataManager {
   @VisibleForTesting
-  final ConcurrentHashMap<Object, IndexSegment> _primaryKeyToSegmentMap = new 
ConcurrentHashMap<>();
+  final AtomicDouble _largestSeenTime = new AtomicDouble(0);
+  @VisibleForTesting
+  final ConcurrentHashMap<Object, Pair<IndexSegment, Double>> 
_primaryKeyToSegmentAndTimeMap =
+      new ConcurrentHashMap<>();
 
-  public ConcurrentMapPartitionDedupMetadataManager(String tableNameWithType, 
List<String> primaryKeyColumns,
-      int partitionId, ServerMetrics serverMetrics, HashFunction hashFunction) 
{
-    _tableNameWithType = tableNameWithType;
-    _primaryKeyColumns = primaryKeyColumns;
-    _partitionId = partitionId;
-    _serverMetrics = serverMetrics;
-    _hashFunction = hashFunction;
+  protected ConcurrentMapPartitionDedupMetadataManager(String 
tableNameWithType, int partitionId,
+      DedupContext dedupContext) {
+    super(tableNameWithType, partitionId, dedupContext);
   }
 
-  public void addSegment(IndexSegment segment) {
-    // Add all PKs to _primaryKeyToSegmentMap
-    Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment);
-    while (primaryKeyIterator.hasNext()) {
-      PrimaryKey pk = primaryKeyIterator.next();
-      _primaryKeyToSegmentMap.put(HashUtils.hashPrimaryKey(pk, _hashFunction), 
segment);
+  @Override
+  protected void doAddOrReplaceSegment(IndexSegment oldSegment, IndexSegment 
newSegment,
+      Iterator<DedupRecordInfo> dedupRecordInfoIteratorOfNewSegment) {
+    String segmentName = newSegment.getSegmentName();
+    while (dedupRecordInfoIteratorOfNewSegment.hasNext()) {
+      DedupRecordInfo dedupRecordInfo = 
dedupRecordInfoIteratorOfNewSegment.next();
+      double dedupTime = dedupRecordInfo.getDedupTime();
+      _largestSeenTime.getAndUpdate(time -> Math.max(time, dedupTime));
+      
_primaryKeyToSegmentAndTimeMap.compute(HashUtils.hashPrimaryKey(dedupRecordInfo.getPrimaryKey(),
 _hashFunction),
+          (primaryKey, segmentAndTime) -> {
+            if (segmentAndTime == null) {
+              return Pair.of(newSegment, dedupTime);
+            }
+            // when oldSegment is null, it means we are adding a new segment
+            // when oldSegment is not null, it means we are replacing an 
existing segment
+            if (oldSegment == null) {
+              _logger.warn("When adding a new segment: record in segment: {} 
with primary key: {} and dedup "
+                      + "time: {} already exists in segment: {} with dedup 
time: {}", segmentName,
+                  dedupRecordInfo.getPrimaryKey(), dedupTime, 
segmentAndTime.getLeft().getSegmentName(),
+                  segmentAndTime.getRight());
+            } else {
+              if (segmentAndTime.getLeft() != oldSegment) {
+                _logger.warn("When replacing a segment: record in segment: {} 
with primary key: {} and dedup "
+                        + "time: {} exists in segment: {} (but not the 
segment: {} to replace) with dedup time: {}",
+                    segmentName, dedupRecordInfo.getPrimaryKey(), dedupTime, 
segmentAndTime.getLeft().getSegmentName(),
+                    oldSegment.getSegmentName(), segmentAndTime.getRight());
+              }
+            }
+            // When dedup time is the same, we always keep the latest segment
+            // This will handle segment replacement case correctly - a typical 
case is when a mutable segment is
+            // replaced by an immutable segment
+            if (segmentAndTime.getRight() <= dedupTime) {
+              return Pair.of(newSegment, dedupTime);
+            }
+            return segmentAndTime;
+          });
     }
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
-        _primaryKeyToSegmentMap.size());
   }
 
-  public void removeSegment(IndexSegment segment) {
-    // TODO(saurabh): Explain reload scenario here
-    Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment);
-    while (primaryKeyIterator.hasNext()) {
-      PrimaryKey pk = primaryKeyIterator.next();
-      _primaryKeyToSegmentMap.compute(HashUtils.hashPrimaryKey(pk, 
_hashFunction), (primaryKey, currentSegment) -> {
-        if (currentSegment == segment) {
-          return null;
-        } else {
-          return currentSegment;
-        }
-      });
+  @Override
+  protected void doRemoveSegment(IndexSegment segment, 
Iterator<DedupRecordInfo> dedupRecordInfoIterator) {
+    while (dedupRecordInfoIterator.hasNext()) {
+      DedupRecordInfo dedupRecordInfo = dedupRecordInfoIterator.next();
+      _primaryKeyToSegmentAndTimeMap.computeIfPresent(
+          HashUtils.hashPrimaryKey(dedupRecordInfo.getPrimaryKey(), 
_hashFunction), (primaryKey, segmentAndTime) -> {
+            // do not need to compare dedup time because we are removing the 
segment
+            if (segmentAndTime.getLeft() == segment) {
+              return null;
+            } else {
+              return segmentAndTime;
+            }
+          });
     }
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
-        _primaryKeyToSegmentMap.size());
   }
 
-  @VisibleForTesting
-  Iterator<PrimaryKey> getPrimaryKeyIterator(IndexSegment segment) {
-    Map<String, PinotSegmentColumnReader> columnToReaderMap = new HashMap<>();
-    for (String primaryKeyColumn : _primaryKeyColumns) {
-      columnToReaderMap.put(primaryKeyColumn, new 
PinotSegmentColumnReader(segment, primaryKeyColumn));
+  @Override
+  protected void doRemoveExpiredPrimaryKeys() {
+    if (_metadataTTL > 0) {
+      double smallestTimeToKeep = _largestSeenTime.get() - _metadataTTL;
+      _primaryKeyToSegmentAndTimeMap.entrySet().removeIf(entry -> 
entry.getValue().getRight() < smallestTimeToKeep);
     }
-    int numTotalDocs = segment.getSegmentMetadata().getTotalDocs();
-    int numPrimaryKeyColumns = _primaryKeyColumns.size();
-    return new Iterator<PrimaryKey>() {
-      private int _docId = 0;
+  }
 
-      @Override
-      public boolean hasNext() {
-        return _docId < numTotalDocs;
+  @Override
+  public boolean checkRecordPresentOrUpdate(DedupRecordInfo dedupRecordInfo, 
IndexSegment indexSegment) {
+    if (!startOperation()) {
+      _logger.info("Skip adding record to {} because metadata manager is 
already stopped",
+          indexSegment.getSegmentName());
+      return true;
+    }
+    try {
+      _largestSeenTime.getAndUpdate(time -> Math.max(time, 
dedupRecordInfo.getDedupTime()));
+      boolean present = _primaryKeyToSegmentAndTimeMap.putIfAbsent(
+          HashUtils.hashPrimaryKey(dedupRecordInfo.getPrimaryKey(), 
_hashFunction),
+          Pair.of(indexSegment, dedupRecordInfo.getDedupTime())) != null;
+      if (!present) {
+        _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, 
_partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
+            _primaryKeyToSegmentAndTimeMap.size());
       }
+      return present;
+    } finally {
+      finishOperation();
+    }
+  }
 
-      @Override
-      public PrimaryKey next() {
-        Object[] values = new Object[numPrimaryKeyColumns];
-        for (int i = 0; i < numPrimaryKeyColumns; i++) {
-          Object value = 
columnToReaderMap.get(_primaryKeyColumns.get(i)).getValue(_docId);
-          if (value instanceof byte[]) {
-            value = new ByteArray((byte[]) value);
-          }
-          values[i] = value;
-        }
-        _docId++;
-        return new PrimaryKey(values);
-      }
-    };
+  @Override
+  protected long getNumPrimaryKeys() {
+    return _primaryKeyToSegmentAndTimeMap.size();
   }
 
-  public boolean checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment 
indexSegment) {
-    boolean present =
-        _primaryKeyToSegmentMap.putIfAbsent(HashUtils.hashPrimaryKey(pk, 
_hashFunction), indexSegment) != null;
-    if (!present) {
-      _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, 
_partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
-          _primaryKeyToSegmentMap.size());
-    }
-    return present;
+  @Override
+  protected void doClose()
+      throws IOException {
+    _primaryKeyToSegmentAndTimeMap.clear();
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
index 0f3de3aa96..4bb7929ffe 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
@@ -18,10 +18,9 @@
  */
 package org.apache.pinot.segment.local.dedup;
 
-class ConcurrentMapTableDedupMetadataManager extends 
BaseTableDedupMetadataManager {
 
+class ConcurrentMapTableDedupMetadataManager extends 
BaseTableDedupMetadataManager {
   protected PartitionDedupMetadataManager 
createPartitionDedupMetadataManager(Integer partitionId) {
-    return new ConcurrentMapPartitionDedupMetadataManager(_tableNameWithType, 
_primaryKeyColumns, partitionId,
-        _serverMetrics, _hashFunction);
+    return new ConcurrentMapPartitionDedupMetadataManager(_tableNameWithType, 
partitionId, _dedupContext);
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
new file mode 100644
index 0000000000..a523f26957
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.List;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class DedupContext {
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final List<String> _primaryKeyColumns;
+  private final HashFunction _hashFunction;
+  private final double _metadataTTL;
+  private final String _dedupTimeColumn;
+  private final File _tableIndexDir;
+  private final TableDataManager _tableDataManager;
+  private final ServerMetrics _serverMetrics;
+
+  private DedupContext(TableConfig tableConfig, Schema schema, List<String> 
primaryKeyColumns,
+      HashFunction hashFunction, double metadataTTL, String dedupTimeColumn, 
File tableIndexDir,
+      TableDataManager tableDataManager, ServerMetrics serverMetrics) {
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _primaryKeyColumns = primaryKeyColumns;
+    _hashFunction = hashFunction;
+    _metadataTTL = metadataTTL;
+    _dedupTimeColumn = dedupTimeColumn;
+    _tableIndexDir = tableIndexDir;
+    _tableDataManager = tableDataManager;
+    _serverMetrics = serverMetrics;
+  }
+
+  public TableConfig getTableConfig() {
+    return _tableConfig;
+  }
+
+  public Schema getSchema() {
+    return _schema;
+  }
+
+  public List<String> getPrimaryKeyColumns() {
+    return _primaryKeyColumns;
+  }
+
+  public HashFunction getHashFunction() {
+    return _hashFunction;
+  }
+
+  public double getMetadataTTL() {
+    return _metadataTTL;
+  }
+
+  public String getDedupTimeColumn() {
+    return _dedupTimeColumn;
+  }
+
+  public File getTableIndexDir() {
+    return _tableIndexDir;
+  }
+
+  public TableDataManager getTableDataManager() {
+    return _tableDataManager;
+  }
+
+  public ServerMetrics getServerMetrics() {
+    return _serverMetrics;
+  }
+
+  public static class Builder {
+    private TableConfig _tableConfig;
+    private Schema _schema;
+    private List<String> _primaryKeyColumns;
+    private HashFunction _hashFunction;
+    private double _metadataTTL;
+    private String _dedupTimeColumn;
+    private File _tableIndexDir;
+    private TableDataManager _tableDataManager;
+    private ServerMetrics _serverMetrics;
+
+    public Builder setTableConfig(TableConfig tableConfig) {
+      _tableConfig = tableConfig;
+      return this;
+    }
+
+    public Builder setSchema(Schema schema) {
+      _schema = schema;
+      return this;
+    }
+
+    public Builder setPrimaryKeyColumns(List<String> primaryKeyColumns) {
+      _primaryKeyColumns = primaryKeyColumns;
+      return this;
+    }
+
+    public Builder setHashFunction(HashFunction hashFunction) {
+      _hashFunction = hashFunction;
+      return this;
+    }
+
+    public Builder setMetadataTTL(double metadataTTL) {
+      _metadataTTL = metadataTTL;
+      return this;
+    }
+
+    public Builder setDedupTimeColumn(String deupTimeColumn) {
+      _dedupTimeColumn = deupTimeColumn;
+      return this;
+    }
+
+    public Builder setTableIndexDir(File tableIndexDir) {
+      _tableIndexDir = tableIndexDir;
+      return this;
+    }
+
+    public Builder setTableDataManager(TableDataManager tableDataManager) {
+      _tableDataManager = tableDataManager;
+      return this;
+    }
+
+    public Builder setServerMetrics(ServerMetrics serverMetrics) {
+      _serverMetrics = serverMetrics;
+      return this;
+    }
+
+    public DedupContext build() {
+      Preconditions.checkState(_tableConfig != null, "Table config must be 
set");
+      Preconditions.checkState(_schema != null, "Schema must be set");
+      Preconditions.checkState(CollectionUtils.isNotEmpty(_primaryKeyColumns), 
"Primary key columns must be set");
+      Preconditions.checkState(_hashFunction != null, "Hash function must be 
set");
+      Preconditions.checkState(_tableIndexDir != null, "Table index directory 
must be set");
+      return new DedupContext(_tableConfig, _schema, _primaryKeyColumns, 
_hashFunction, _metadataTTL, _dedupTimeColumn,
+          _tableIndexDir, _tableDataManager, _serverMetrics);
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupRecordInfo.java
similarity index 62%
copy from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
copy to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupRecordInfo.java
index 0f3de3aa96..191605076d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupRecordInfo.java
@@ -18,10 +18,27 @@
  */
 package org.apache.pinot.segment.local.dedup;
 
-class ConcurrentMapTableDedupMetadataManager extends 
BaseTableDedupMetadataManager {
+import org.apache.pinot.spi.data.readers.PrimaryKey;
 
-  protected PartitionDedupMetadataManager 
createPartitionDedupMetadataManager(Integer partitionId) {
-    return new ConcurrentMapPartitionDedupMetadataManager(_tableNameWithType, 
_primaryKeyColumns, partitionId,
-        _serverMetrics, _hashFunction);
+
+public class DedupRecordInfo {
+  private final PrimaryKey _primaryKey;
+  private final double _dedupTime;
+
+  public DedupRecordInfo(PrimaryKey primaryKey, double dedupTime) {
+    _primaryKey = primaryKey;
+    _dedupTime = dedupTime;
+  }
+
+  public DedupRecordInfo(PrimaryKey primaryKey) {
+    this(primaryKey, Double.MIN_VALUE);
+  }
+
+  public PrimaryKey getPrimaryKey() {
+    return _primaryKey;
+  }
+
+  public double getDedupTime() {
+    return _dedupTime;
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupUtils.java
new file mode 100644
index 0000000000..2b9c1c3cda
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupUtils.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+
+
+public class DedupUtils {
+  private DedupUtils() {
+  }
+
+  public static class DedupRecordInfoReader implements Closeable {
+    private final PrimaryKeyReader _primaryKeyReader;
+    private final PinotSegmentColumnReader _dedupTimeColumnReader;
+
+    public DedupRecordInfoReader(IndexSegment segment, List<String> 
primaryKeyColumns,
+        @Nullable String dedupTimeColumn) {
+      _primaryKeyReader = new PrimaryKeyReader(segment, primaryKeyColumns);
+      if (dedupTimeColumn != null) {
+        _dedupTimeColumnReader = new PinotSegmentColumnReader(segment, 
dedupTimeColumn);
+      } else {
+        _dedupTimeColumnReader = null;
+      }
+    }
+
+    @VisibleForTesting
+    public DedupRecordInfoReader(PrimaryKeyReader primaryKeyReader,
+        @Nullable PinotSegmentColumnReader dedupTimeColumnReader) {
+      _primaryKeyReader = primaryKeyReader;
+      _dedupTimeColumnReader = dedupTimeColumnReader;
+    }
+
+    public DedupRecordInfo getDedupRecordInfo(int docId) {
+      PrimaryKey primaryKey = _primaryKeyReader.getPrimaryKey(docId);
+      double dedupTime =
+          (_dedupTimeColumnReader != null) ? ((Number) 
_dedupTimeColumnReader.getValue(docId)).doubleValue()
+              : Double.MIN_VALUE;
+      return new DedupRecordInfo(primaryKey, dedupTime);
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+      _primaryKeyReader.close();
+      if (_dedupTimeColumnReader != null) {
+        _dedupTimeColumnReader.close();
+      }
+    }
+  }
+
+  /**
+   * Returns an iterator of {@link DedupRecordInfo} for all the documents from 
the segment.
+   */
+  public static Iterator<DedupRecordInfo> 
getDedupRecordInfoIterator(DedupRecordInfoReader dedupRecordInfoReader,
+      int numDocs) {
+    return new Iterator<DedupRecordInfo>() {
+      private int _docId = 0;
+
+      @Override
+      public boolean hasNext() {
+        return _docId < numDocs;
+      }
+
+      @Override
+      public DedupRecordInfo next() {
+        return dedupRecordInfoReader.getDedupRecordInfo(_docId++);
+      }
+    };
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
index 82dfc9d5ed..835ce6dfa7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
@@ -18,24 +18,58 @@
  */
 package org.apache.pinot.segment.local.dedup;
 
+import java.io.Closeable;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 
 
-public interface PartitionDedupMetadataManager {
+public interface PartitionDedupMetadataManager extends Closeable {
   /**
    * Initializes the dedup metadata for the given immutable segment.
    */
-  public void addSegment(IndexSegment segment);
+  void addSegment(IndexSegment segment);
+
+  /**
+   * Replaces the dedup metadata for the given old segment with the new 
segment.
+   */
+  default void replaceSegment(IndexSegment oldSegment, IndexSegment 
newSegment) {
+    // since this is a newly added method, by default, add the new segment to 
keep backward compatibility
+    addSegment(newSegment);
+  }
 
   /**
    * Removes the dedup metadata for the given segment.
    */
-  public void removeSegment(IndexSegment segment);
+  void removeSegment(IndexSegment segment);
+
+  /**
+   * Remove the expired primary keys from the metadata when TTL is enabled.
+   */
+  void removeExpiredPrimaryKeys();
 
   /**
-   * Add the primary key to the given segment to the dedup matadata if it was 
absent.
-   * Returns true if the key was already present.
+   * Add the primary key to the given segment to the dedup matadata if it is 
absent.
+   * Returns true if the key was already present, i.e., the new record 
associated with the given {@link PrimaryKey}
+   * is a duplicate and should be skipped/dropped.
    */
+  @Deprecated
   boolean checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment indexSegment);
+
+  /**
+   * Add the primary key to the given segment to the dedup matadata if it is 
absent and with in the retention time.
+   * Returns true if the key was already present, i.e., the new record 
associated with the given {@link DedupRecordInfo}
+   * is a duplicate and should be skipped/dropped.
+   * @param dedupRecordInfo  The primary key and the dedup time.
+   * @param indexSegment  The segment to which the record belongs.
+   * @return true if the key was already present, i.e., the new record 
associated with the given {@link DedupRecordInfo}
+   * is a duplicate and should be skipped/dropped.
+   */
+  default boolean checkRecordPresentOrUpdate(DedupRecordInfo dedupRecordInfo, 
IndexSegment indexSegment) {
+    return checkRecordPresentOrUpdate(dedupRecordInfo.getPrimaryKey(), 
indexSegment);
+  }
+
+  /**
+   * Stops the metadata manager. After invoking this method, no access to the 
metadata will be accepted.
+   */
+  void stop();
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
index aa900f75db..5c0bd1d830 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pinot.segment.local.dedup;
 
+import java.io.Closeable;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 
 
-public interface TableDedupMetadataManager {
+public interface TableDedupMetadataManager extends Closeable {
   /**
    * Initialize TableDedupMetadataManager.
    */
@@ -34,4 +35,9 @@ public interface TableDedupMetadataManager {
    * Create a new PartitionDedupMetadataManager if not present already, 
otherwise return existing one.
    */
   PartitionDedupMetadataManager getOrCreatePartitionManager(int partitionId);
+
+  /**
+   * Stops the metadata manager. After invoking this method, no access to the 
metadata will be accepted.
+   */
+  void stop();
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 847f589f2a..730d7a02ea 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -47,6 +47,7 @@ import 
org.apache.pinot.common.request.context.FunctionContext;
 import org.apache.pinot.common.request.context.RequestContextUtils;
 import org.apache.pinot.segment.local.aggregator.ValueAggregator;
 import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.dedup.DedupRecordInfo;
 import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
 import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
@@ -158,6 +159,7 @@ public class MutableSegmentImpl implements MutableSegment {
   private volatile long _latestIngestionTimeMs = Long.MIN_VALUE;
 
   private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
+  private final String _dedupTimeColumn;
 
   private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
   private final List<String> _upsertComparisonColumns;
@@ -354,6 +356,11 @@ public class MutableSegmentImpl implements MutableSegment {
     }
 
     _partitionDedupMetadataManager = config.getPartitionDedupMetadataManager();
+    if (_partitionDedupMetadataManager != null) {
+      _dedupTimeColumn = config.getDedupTimeColumn() == null ? _timeColumnName 
: config.getDedupTimeColumn();
+    } else {
+      _dedupTimeColumn = null;
+    }
 
     _partitionUpsertMetadataManager = 
config.getPartitionUpsertMetadataManager();
     if (_partitionUpsertMetadataManager != null) {
@@ -469,8 +476,8 @@ public class MutableSegmentImpl implements MutableSegment {
     int numDocsIndexed = _numDocsIndexed;
 
     if (isDedupEnabled()) {
-      PrimaryKey primaryKey = 
row.getPrimaryKey(_schema.getPrimaryKeyColumns());
-      if 
(_partitionDedupMetadataManager.checkRecordPresentOrUpdate(primaryKey, this)) {
+      DedupRecordInfo dedupRecordInfo = getDedupRecordInfo(row);
+      if 
(_partitionDedupMetadataManager.checkRecordPresentOrUpdate(dedupRecordInfo, 
this)) {
         if (_serverMetrics != null) {
           _serverMetrics.addMeteredTableValue(_realtimeTableName, 
ServerMeter.REALTIME_DEDUP_DROPPED, 1);
         }
@@ -536,6 +543,16 @@ public class MutableSegmentImpl implements MutableSegment {
     return _partitionDedupMetadataManager != null;
   }
 
+  private DedupRecordInfo getDedupRecordInfo(GenericRow row) {
+    PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
+    // it is okay not having dedup time column if metadata ttl is not enabled
+    if (_dedupTimeColumn == null) {
+      return new DedupRecordInfo(primaryKey);
+    }
+    double dedupTime = ((Number) row.getValue(_dedupTimeColumn)).doubleValue();
+    return new DedupRecordInfo(primaryKey, dedupTime);
+  }
+
   private RecordInfo getRecordInfo(GenericRow row, int docId) {
     PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
     Comparable comparisonValue = getComparisonValue(row);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 956135f7ca..2d5e7d10d2 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -65,6 +65,7 @@ public class RealtimeSegmentConfig {
   private final String _upsertOutOfOrderRecordColumn;
   private final boolean _upsertDropOutOfOrderRecord;
   private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+  private final String _dedupTimeColumn;
   private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
   private final String _consumerDir;
   private final List<FieldConfig> _fieldConfigList;
@@ -78,7 +79,7 @@ public class RealtimeSegmentConfig {
       PartitionFunction partitionFunction, int partitionId, boolean 
aggregateMetrics, boolean nullHandlingEnabled,
       String consumerDir, UpsertConfig.Mode upsertMode, List<String> 
upsertComparisonColumns,
       String upsertDeleteRecordColumn, String upsertOutOfOrderRecordColumn, 
boolean upsertDropOutOfOrderRecord,
-      PartitionUpsertMetadataManager partitionUpsertMetadataManager,
+      PartitionUpsertMetadataManager partitionUpsertMetadataManager, String 
dedupTimeColumn,
       PartitionDedupMetadataManager partitionDedupMetadataManager, 
List<FieldConfig> fieldConfigList,
       List<AggregationConfig> ingestionAggregationConfigs) {
     _tableNameWithType = tableNameWithType;
@@ -105,6 +106,7 @@ public class RealtimeSegmentConfig {
     _upsertOutOfOrderRecordColumn = upsertOutOfOrderRecordColumn;
     _upsertDropOutOfOrderRecord = upsertDropOutOfOrderRecord;
     _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
+    _dedupTimeColumn = dedupTimeColumn;
     _partitionDedupMetadataManager = partitionDedupMetadataManager;
     _fieldConfigList = fieldConfigList;
     _ingestionAggregationConfigs = ingestionAggregationConfigs;
@@ -210,6 +212,10 @@ public class RealtimeSegmentConfig {
     return _partitionUpsertMetadataManager;
   }
 
+  public String getDedupTimeColumn() {
+    return _dedupTimeColumn;
+  }
+
   public PartitionDedupMetadataManager getPartitionDedupMetadataManager() {
     return _partitionDedupMetadataManager;
   }
@@ -247,6 +253,7 @@ public class RealtimeSegmentConfig {
     private String _upsertOutOfOrderRecordColumn;
     private boolean _upsertDropOutOfOrderRecord;
     private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+    private String _dedupTimeColumn;
     private PartitionDedupMetadataManager _partitionDedupMetadataManager;
     private List<FieldConfig> _fieldConfigList;
     private List<AggregationConfig> _ingestionAggregationConfigs;
@@ -401,6 +408,11 @@ public class RealtimeSegmentConfig {
       return this;
     }
 
+    public Builder setDedupTimeColumn(String dedupTimeColumn) {
+      _dedupTimeColumn = dedupTimeColumn;
+      return this;
+    }
+
     public Builder 
setPartitionDedupMetadataManager(PartitionDedupMetadataManager 
partitionDedupMetadataManager) {
       _partitionDedupMetadataManager = partitionDedupMetadataManager;
       return this;
@@ -427,7 +439,7 @@ public class RealtimeSegmentConfig {
           _memoryManager, _statsHistory, _partitionColumn, _partitionFunction, 
_partitionId, _aggregateMetrics,
           _nullHandlingEnabled, _consumerDir, _upsertMode, 
_upsertComparisonColumns, _upsertDeleteRecordColumn,
           _upsertOutOfOrderRecordColumn, _upsertDropOutOfOrderRecord,
-          _partitionUpsertMetadataManager, _partitionDedupMetadataManager, 
_fieldConfigList,
+          _partitionUpsertMetadataManager, _dedupTimeColumn, 
_partitionDedupMetadataManager, _fieldConfigList,
           _ingestionAggregationConfigs);
     }
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PrimaryKeyReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PrimaryKeyReader.java
new file mode 100644
index 0000000000..551d242b7c
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PrimaryKeyReader.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.readers;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+public class PrimaryKeyReader implements Closeable {
+  public final List<PinotSegmentColumnReader> _primaryKeyColumnReaders;
+
+  public PrimaryKeyReader(IndexSegment segment, List<String> 
primaryKeyColumns) {
+    _primaryKeyColumnReaders = new ArrayList<>(primaryKeyColumns.size());
+    for (String primaryKeyColumn : primaryKeyColumns) {
+      _primaryKeyColumnReaders.add(new PinotSegmentColumnReader(segment, 
primaryKeyColumn));
+    }
+  }
+
+  public PrimaryKey getPrimaryKey(int docId) {
+    int numPrimaryKeys = _primaryKeyColumnReaders.size();
+    Object[] values = new Object[numPrimaryKeys];
+    for (int i = 0; i < numPrimaryKeys; i++) {
+      values[i] = getValue(_primaryKeyColumnReaders.get(i), docId);
+    }
+    return new PrimaryKey(values);
+  }
+
+  public void getPrimaryKey(int docId, PrimaryKey buffer) {
+    Object[] values = buffer.getValues();
+    int numPrimaryKeys = values.length;
+    for (int i = 0; i < numPrimaryKeys; i++) {
+      values[i] = getValue(_primaryKeyColumnReaders.get(i), docId);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    for (PinotSegmentColumnReader primaryKeyColumnReader : 
_primaryKeyColumnReaders) {
+      primaryKeyColumnReader.close();
+    }
+  }
+
+  private static Object getValue(PinotSegmentColumnReader columnReader, int 
docId) {
+    Object value = columnReader.getValue(docId);
+    return value instanceof byte[] ? new ByteArray((byte[]) value) : value;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 9ae0c5b746..6951fc8197 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -65,6 +65,7 @@ import 
org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
 import org.apache.pinot.segment.local.utils.HashUtils;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -740,8 +741,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   }
 
   protected void removeSegment(IndexSegment segment, MutableRoaringBitmap 
validDocIds) {
-    try (
-        UpsertUtils.PrimaryKeyReader primaryKeyReader = new 
UpsertUtils.PrimaryKeyReader(segment, _primaryKeyColumns)) {
+    try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment, 
_primaryKeyColumns)) {
       removeSegment(segment, 
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
     } catch (Exception e) {
       throw new RuntimeException(
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
index 7e8f510737..7054b1c030 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -33,6 +33,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerMeter;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.segment.readers.LazyRow;
+import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
 import org.apache.pinot.segment.local.utils.HashUtils;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -187,7 +188,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
     long startTimeMs = System.currentTimeMillis();
 
     try (
-        UpsertUtils.PrimaryKeyReader primaryKeyReader = new 
UpsertUtils.PrimaryKeyReader(segment, _primaryKeyColumns)) {
+        PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment, 
_primaryKeyColumns)) {
       removeSegment(segment,
           UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, 
segment.getSegmentMetadata().getTotalDocs()));
     } catch (Exception e) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index 47d6a72743..3086da1969 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -20,11 +20,11 @@ package org.apache.pinot.segment.local.upsert;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
 import org.apache.pinot.segment.spi.IndexSegment;
 import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
@@ -161,48 +161,10 @@ public class UpsertUtils {
         throws IOException {
       _primaryKeyReader.close();
       _comparisonColumnReader.close();
-    }
-  }
-
-  public static class PrimaryKeyReader implements Closeable {
-    public final List<PinotSegmentColumnReader> _primaryKeyColumnReaders;
-
-    public PrimaryKeyReader(IndexSegment segment, List<String> 
primaryKeyColumns) {
-      _primaryKeyColumnReaders = new ArrayList<>(primaryKeyColumns.size());
-      for (String primaryKeyColumn : primaryKeyColumns) {
-        _primaryKeyColumnReaders.add(new PinotSegmentColumnReader(segment, 
primaryKeyColumn));
-      }
-    }
-
-    public PrimaryKey getPrimaryKey(int docId) {
-      int numPrimaryKeys = _primaryKeyColumnReaders.size();
-      Object[] values = new Object[numPrimaryKeys];
-      for (int i = 0; i < numPrimaryKeys; i++) {
-        values[i] = getValue(_primaryKeyColumnReaders.get(i), docId);
-      }
-      return new PrimaryKey(values);
-    }
-
-    public void getPrimaryKey(int docId, PrimaryKey buffer) {
-      Object[] values = buffer.getValues();
-      int numPrimaryKeys = values.length;
-      for (int i = 0; i < numPrimaryKeys; i++) {
-        values[i] = getValue(_primaryKeyColumnReaders.get(i), docId);
+      if (_deleteRecordColumnReader != null) {
+        _deleteRecordColumnReader.close();
       }
     }
-
-    @Override
-    public void close()
-        throws IOException {
-      for (PinotSegmentColumnReader primaryKeyColumnReader : 
_primaryKeyColumnReaders) {
-        primaryKeyColumnReader.close();
-      }
-    }
-  }
-
-  static Object getValue(PinotSegmentColumnReader columnReader, int docId) {
-    Object value = columnReader.getValue(docId);
-    return value instanceof byte[] ? new ByteArray((byte[]) value) : value;
   }
 
   public interface ComparisonColumnReader extends Closeable {
@@ -246,7 +208,7 @@ public class UpsertUtils {
         PinotSegmentColumnReader columnReader = _comparisonColumnReaders[i];
         Comparable comparisonValue = null;
         if (!columnReader.isNull(docId)) {
-          comparisonValue = (Comparable) UpsertUtils.getValue(columnReader, 
docId);
+          comparisonValue = (Comparable) getValue(columnReader, docId);
         }
         comparisonColumns[i] = comparisonValue;
       }
@@ -254,6 +216,11 @@ public class UpsertUtils {
       return new ComparisonColumns(comparisonColumns, 
ComparisonColumns.SEALED_SEGMENT_COMPARISON_INDEX);
     }
 
+    private static Object getValue(PinotSegmentColumnReader columnReader, int 
docId) {
+      Object value = columnReader.getValue(docId);
+      return value instanceof byte[] ? new ByteArray((byte[]) value) : value;
+    }
+
     @Override
     public void close()
         throws IOException {
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
new file mode 100644
index 0000000000..6349caac74
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
+import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.*;
+
+
+public class ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
+  private static final int METADATA_TTL = 10000;
+  private ConcurrentMapPartitionDedupMetadataManager _metadataManager;
+
+  @BeforeMethod
+  public void setUp() {
+    DedupContext.Builder dedupContextBuider = new DedupContext.Builder();
+    
dedupContextBuider.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
+        
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setHashFunction(HashFunction.NONE)
+        .setMetadataTTL(METADATA_TTL).setDedupTimeColumn("dedupTimeColumn")
+        
.setTableIndexDir(mock(File.class)).setTableDataManager(mock(TableDataManager.class))
+        .setServerMetrics(mock(ServerMetrics.class));
+    DedupContext dedupContext = dedupContextBuider.build();
+    _metadataManager =
+        new 
ConcurrentMapPartitionDedupMetadataManager(DedupTestUtils.REALTIME_TABLE_NAME, 
0, dedupContext);
+  }
+
+  @Test
+  public void creatingMetadataManagerThrowsExceptions() {
+    DedupContext.Builder dedupContextBuider = new DedupContext.Builder();
+    
dedupContextBuider.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
+        
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setHashFunction(HashFunction.NONE).setMetadataTTL(1)
+        .setDedupTimeColumn(null).setTableIndexDir(mock(File.class))
+        
.setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class));
+    DedupContext dedupContext = dedupContextBuider.build();
+    assertThrows(IllegalArgumentException.class,
+        () -> new 
ConcurrentMapPartitionDedupMetadataManager(DedupTestUtils.REALTIME_TABLE_NAME, 
0,
+            dedupContext));
+  }
+
+  @Test
+  public void verifyRemoveExpiredPrimaryKeys() {
+    IndexSegment segment = Mockito.mock(IndexSegment.class);
+    for (int i = 0; i < 20; i++) {
+      double time = i * 1000;
+      Object primaryKeyKey = 
HashUtils.hashPrimaryKey(DedupTestUtils.getPrimaryKey(i), HashFunction.NONE);
+      
_metadataManager._primaryKeyToSegmentAndTimeMap.computeIfAbsent(primaryKeyKey, 
k -> Pair.of(segment, time));
+    }
+    _metadataManager._largestSeenTime.set(19000);
+    assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 20);
+    verifyInMemoryState(0, 20, segment);
+
+    _metadataManager.removeExpiredPrimaryKeys();
+    assertEquals(_metadataManager.getNumPrimaryKeys(), 11);
+    assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 11);
+    verifyInMemoryState(9, 11, segment);
+  }
+
+  @Test
+  public void verifyAddRemoveTheSameSegment() {
+    DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = 
generateDedupRecordInfoReader(10, 0);
+    Iterator<DedupRecordInfo> dedupRecordInfoIterator =
+        DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 10);
+    IndexSegment segment = DedupTestUtils.mockSegment(1, 10);
+    _metadataManager.doAddOrReplaceSegment(null, segment, 
dedupRecordInfoIterator);
+    verifyInitialSegmentAddition(segment);
+
+    dedupRecordInfoIterator = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 10);
+    _metadataManager.doRemoveSegment(segment, dedupRecordInfoIterator);
+    assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 0);
+    assertEquals(_metadataManager._largestSeenTime.get(), 9000);
+  }
+
+  @Test
+  public void verifyAddingTwoSegmentWithSamePrimaryKeys() {
+    DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = 
generateDedupRecordInfoReader(10, 0);
+    IndexSegment segment = DedupTestUtils.mockSegment(1, 10);
+    Iterator<DedupRecordInfo> dedupRecordInfoIterator =
+        DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 10);
+    _metadataManager.doAddOrReplaceSegment(null, segment, 
dedupRecordInfoIterator);
+    verifyInitialSegmentAddition(segment);
+
+    IndexSegment segment2 = DedupTestUtils.mockSegment(2, 10);
+    dedupRecordInfoIterator = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 10);
+    _metadataManager.doAddOrReplaceSegment(segment, segment2, 
dedupRecordInfoIterator);
+    verifyInitialSegmentAddition(segment2);
+  }
+
+  @Test
+  public void verifyRemoveAnotherSegmentWithTheSamePrimaryKeys() {
+    DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = 
generateDedupRecordInfoReader(10, 0);
+    IndexSegment segment = DedupTestUtils.mockSegment(1, 10);
+    Iterator<DedupRecordInfo> dedupRecordInfoIterator =
+        DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 10);
+    _metadataManager.doAddOrReplaceSegment(null, segment, 
dedupRecordInfoIterator);
+    verifyInitialSegmentAddition(segment);
+
+    IndexSegment segment2 = DedupTestUtils.mockSegment(2, 10);
+    dedupRecordInfoIterator = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 10);
+    _metadataManager.doRemoveSegment(segment2, dedupRecordInfoIterator);
+    verifyInitialSegmentAddition(segment);
+  }
+
+  private void verifyInitialSegmentAddition(IndexSegment segment) {
+    assertEquals(_metadataManager._largestSeenTime.get(), 9000);
+    assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 10);
+    verifyInMemoryState(0, 10, segment);
+  }
+
+  private void verifyInMemoryState(int startPrimaryKeyId, int recordCount, 
IndexSegment segment) {
+    for (int primaryKeyId = startPrimaryKeyId; primaryKeyId < 
startPrimaryKeyId + recordCount; primaryKeyId++) {
+      PrimaryKey primaryKey = DedupTestUtils.getPrimaryKey(primaryKeyId);
+      
assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.get(primaryKey),
+          Pair.of(segment, (double) primaryKeyId * 1000));
+    }
+  }
+
+  @Test
+  public void verifyAddTwoDifferentSegmentsRemoveEarlySegmentFirst() {
+    DedupUtils.DedupRecordInfoReader dedupRecordInfoReader1 = 
generateDedupRecordInfoReader(10, 0);
+    IndexSegment segment1 = DedupTestUtils.mockSegment(1, 10);
+    Iterator<DedupRecordInfo> dedupRecordInfoIterator1 =
+        DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader1, 10);
+    _metadataManager.doAddOrReplaceSegment(null, segment1, 
dedupRecordInfoIterator1);
+    verifyInitialSegmentAddition(segment1);
+
+    DedupUtils.DedupRecordInfoReader dedupRecordInfoReader2 = 
generateDedupRecordInfoReader(10, 10);
+    IndexSegment segment2 = DedupTestUtils.mockSegment(2, 10);
+    Iterator<DedupRecordInfo> dedupRecordInfoIterator2 =
+        DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
+    _metadataManager.doAddOrReplaceSegment(null, segment2, 
dedupRecordInfoIterator2);
+    _metadataManager.removeExpiredPrimaryKeys();
+    assertEquals(_metadataManager.getNumPrimaryKeys(), 11);
+    assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 11);
+    verifyInMemoryState(9, 1, segment1);
+    verifyInMemoryState(10, 10, segment2);
+    assertEquals(_metadataManager._largestSeenTime.get(), 19000);
+
+    dedupRecordInfoIterator1 = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader1, 10);
+    _metadataManager.doRemoveSegment(segment1, dedupRecordInfoIterator1);
+    assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 10);
+    verifyInMemoryState(10, 10, segment2);
+    assertEquals(_metadataManager._largestSeenTime.get(), 19000);
+
+    dedupRecordInfoIterator2 = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
+    _metadataManager.doRemoveSegment(segment2, dedupRecordInfoIterator2);
+    assertTrue(_metadataManager._primaryKeyToSegmentAndTimeMap.isEmpty());
+    assertEquals(_metadataManager._largestSeenTime.get(), 19000);
+  }
+
+  @Test
+  public void verifyAddTwoDifferentSegmentsRemoveRecentSegmentFirst() {
+    DedupUtils.DedupRecordInfoReader dedupRecordInfoReader1 = 
generateDedupRecordInfoReader(10, 0);
+    IndexSegment segment1 = DedupTestUtils.mockSegment(1, 10);
+    Iterator<DedupRecordInfo> dedupRecordInfoIterator1 =
+        DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader1, 10);
+    _metadataManager.doAddOrReplaceSegment(null, segment1, 
dedupRecordInfoIterator1);
+    verifyInitialSegmentAddition(segment1);
+
+    DedupUtils.DedupRecordInfoReader dedupRecordInfoReader2 = 
generateDedupRecordInfoReader(10, 10);
+    IndexSegment segment2 = DedupTestUtils.mockSegment(2, 10);
+    Iterator<DedupRecordInfo> dedupRecordInfoIterator2 =
+        DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
+    _metadataManager.doAddOrReplaceSegment(null, segment2, 
dedupRecordInfoIterator2);
+    _metadataManager.removeExpiredPrimaryKeys();
+    assertEquals(_metadataManager.getNumPrimaryKeys(), 11);
+    assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 11);
+    verifyInMemoryState(10, 10, segment2);
+    assertEquals(_metadataManager._largestSeenTime.get(), 19000);
+
+    dedupRecordInfoIterator2 = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
+    _metadataManager.doRemoveSegment(segment2, dedupRecordInfoIterator2);
+    assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1);
+    verifyInMemoryState(9, 1, segment1);
+    assertEquals(_metadataManager._largestSeenTime.get(), 19000);
+
+    dedupRecordInfoIterator1 = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader1, 10);
+    _metadataManager.doRemoveSegment(segment1, dedupRecordInfoIterator1);
+    assertTrue(_metadataManager._primaryKeyToSegmentAndTimeMap.isEmpty());
+    assertEquals(_metadataManager._largestSeenTime.get(), 19000);
+  }
+
+  @Test
+  public void verifyAddingSegmentWithDuplicatedPrimaryKeys() {
+    PrimaryKey primaryKey = DedupTestUtils.getPrimaryKey(0);
+    PrimaryKeyReader primaryKeyReader = Mockito.mock(PrimaryKeyReader.class);
+    for (int i = 0; i < 3; i++) {
+      Mockito.when(primaryKeyReader.getPrimaryKey(i)).thenReturn(primaryKey);
+    }
+    PinotSegmentColumnReader dedupTimeColumnReader = 
Mockito.mock(PinotSegmentColumnReader.class);
+    Mockito.when(dedupTimeColumnReader.getValue(0)).thenReturn(1000.0);
+    Mockito.when(dedupTimeColumnReader.getValue(1)).thenReturn(15000.0);
+    Mockito.when(dedupTimeColumnReader.getValue(2)).thenReturn(25000.0);
+    DedupUtils.DedupRecordInfoReader dedupRecordInfoReader =
+        new DedupUtils.DedupRecordInfoReader(primaryKeyReader, 
dedupTimeColumnReader);
+    _metadataManager._largestSeenTime.set(20000);
+
+    ImmutableSegmentImpl immutableSegment = DedupTestUtils.mockSegment(1, 3);
+    Iterator<DedupRecordInfo> dedupRecordInfoIterator = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 3);
+    _metadataManager.doAddOrReplaceSegment(null, immutableSegment, 
dedupRecordInfoIterator);
+    assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1);
+    Object primaryKeyHash = HashUtils.hashPrimaryKey(primaryKey, 
HashFunction.NONE);
+    assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1);
+    
assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.get(primaryKeyHash),
+        Pair.of(immutableSegment, 25000.0));
+    assertEquals(_metadataManager._largestSeenTime.get(), 25000);
+  }
+
+  @Test
+  public void verifyAddRow() {
+    _metadataManager._largestSeenTime.set(20000);
+
+    PrimaryKey primaryKey = DedupTestUtils.getPrimaryKey(0);
+    DedupRecordInfo dedupRecordInfo = new DedupRecordInfo(primaryKey, 1000);
+    ImmutableSegmentImpl immutableSegment = DedupTestUtils.mockSegment(1, 1);
+    assertFalse(_metadataManager.checkRecordPresentOrUpdate(dedupRecordInfo, 
immutableSegment));
+    assertFalse(_metadataManager._primaryKeyToSegmentAndTimeMap.isEmpty());
+    assertEquals(_metadataManager._largestSeenTime.get(), 20000);
+
+    Object primaryKeyHash = HashUtils.hashPrimaryKey(primaryKey, 
HashFunction.NONE);
+    dedupRecordInfo = new DedupRecordInfo(primaryKey, 15000);
+    assertTrue(_metadataManager.checkRecordPresentOrUpdate(dedupRecordInfo, 
immutableSegment));
+    assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1);
+    
assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.get(primaryKeyHash),
+        Pair.of(immutableSegment, 1000.0));
+    assertEquals(_metadataManager._largestSeenTime.get(), 20000);
+
+    dedupRecordInfo = new DedupRecordInfo(primaryKey, 25000);
+    assertTrue(_metadataManager.checkRecordPresentOrUpdate(dedupRecordInfo, 
immutableSegment));
+    assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1);
+    
assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.get(primaryKeyHash),
+        Pair.of(immutableSegment, 1000.0));
+    assertEquals(_metadataManager._largestSeenTime.get(), 25000);
+  }
+
+  private static DedupUtils.DedupRecordInfoReader 
generateDedupRecordInfoReader(int numberOfDocs,
+      int startPrimaryKeyValue) {
+    PrimaryKeyReader primaryKeyReader = Mockito.mock(PrimaryKeyReader.class);
+    PinotSegmentColumnReader dedupTimeColumnReader = 
Mockito.mock(PinotSegmentColumnReader.class);
+    for (int i = 0; i < numberOfDocs; i++) {
+      int primaryKeyValue = startPrimaryKeyValue + i;
+      
Mockito.when(primaryKeyReader.getPrimaryKey(i)).thenReturn(DedupTestUtils.getPrimaryKey(primaryKeyValue));
+      double time = primaryKeyValue * 1000;
+      Mockito.when(dedupTimeColumnReader.getValue(i)).thenReturn(time);
+    }
+    return new DedupUtils.DedupRecordInfoReader(primaryKeyReader, 
dedupTimeColumnReader);
+  }
+
+  @Test
+  public void testAddSegmentAfterStop() {
+    IndexSegment segment = DedupTestUtils.mockSegment(1, 10);
+    // throws when not stopped
+    assertThrows(RuntimeException.class, () -> 
_metadataManager.addSegment(segment));
+    _metadataManager.stop();
+    // do not throw when stopped
+    _metadataManager.addSegment(segment);
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
new file mode 100644
index 0000000000..b27ec4ffb0
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
+import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertSame;
+
+
+public class ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest {
+  private ConcurrentMapPartitionDedupMetadataManager _metadataManager;
+
+  @BeforeMethod
+  public void setUp() {
+    DedupContext.Builder dedupContextBuider = new DedupContext.Builder();
+    
dedupContextBuider.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
+        
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setHashFunction(HashFunction.NONE)
+        
.setTableIndexDir(mock(File.class)).setTableDataManager(mock(TableDataManager.class))
+        .setServerMetrics(mock(ServerMetrics.class));
+    DedupContext dedupContext = dedupContextBuider.build();
+    _metadataManager =
+        new 
ConcurrentMapPartitionDedupMetadataManager(DedupTestUtils.REALTIME_TABLE_NAME, 
0, dedupContext);
+  }
+
+  @Test
+  public void verifyAddRemoveSegment() {
+    HashFunction hashFunction = HashFunction.NONE;
+
+    // Add the first segment
+    DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = 
generateDedupRecordInfoReader();
+    Iterator<DedupRecordInfo> dedupRecordInfoIterator = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 6);
+    ImmutableSegmentImpl segment1 = DedupTestUtils.mockSegment(1, 6);
+    _metadataManager.doAddOrReplaceSegment(null, segment1, 
dedupRecordInfoIterator);
+    
Assert.assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 3);
+    checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 0, 
segment1, hashFunction);
+    checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 1, 
segment1, hashFunction);
+    checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 2, 
segment1, hashFunction);
+
+    // reset the iterator
+    dedupRecordInfoIterator = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 6);
+    _metadataManager.doRemoveSegment(segment1, dedupRecordInfoIterator);
+    
Assert.assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 0);
+  }
+
+  @Test
+  public void verifyReloadSegment() {
+    HashFunction hashFunction = HashFunction.NONE;
+
+    // Add the first segment
+    DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = 
generateDedupRecordInfoReader();
+    Iterator<DedupRecordInfo> dedupRecordInfoIterator = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 6);
+    ImmutableSegmentImpl segment1 = DedupTestUtils.mockSegment(1, 6);
+    _metadataManager.doAddOrReplaceSegment(null, segment1, 
dedupRecordInfoIterator);
+
+    // Remove another segment with same PK rows
+    // reset the iterator
+    dedupRecordInfoIterator = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 6);
+    ImmutableSegmentImpl segment2 = DedupTestUtils.mockSegment(1, 6);
+    _metadataManager.doRemoveSegment(segment2, dedupRecordInfoIterator);
+    
Assert.assertEquals(_metadataManager._primaryKeyToSegmentAndTimeMap.size(), 3);
+
+    // Keys should still exist
+    checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 0, 
segment1, hashFunction);
+    checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 1, 
segment1, hashFunction);
+    checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 2, 
segment1, hashFunction);
+  }
+
+  @Test
+  public void verifyAddRow() {
+    HashFunction hashFunction = HashFunction.NONE;
+
+    // Add the first segment
+    DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = 
generateDedupRecordInfoReader();
+    Iterator<DedupRecordInfo> dedupRecordInfoIterator = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 6);
+    ImmutableSegmentImpl segment1 = DedupTestUtils.mockSegment(1, 6);
+    _metadataManager.doAddOrReplaceSegment(null, segment1, 
dedupRecordInfoIterator);
+
+    // Same PK exists
+    // reset the iterator
+    dedupRecordInfoIterator = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 6);
+    ImmutableSegmentImpl segment2 = DedupTestUtils.mockSegment(2, 6);
+    while (dedupRecordInfoIterator.hasNext()) {
+      DedupRecordInfo dedupRecordInfo = dedupRecordInfoIterator.next();
+      
Assert.assertTrue(_metadataManager.checkRecordPresentOrUpdate(dedupRecordInfo, 
segment2));
+    }
+    checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 0, 
segment1, hashFunction);
+    checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 1, 
segment1, hashFunction);
+    checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 2, 
segment1, hashFunction);
+
+    // New PK
+    Assert.assertFalse(_metadataManager.checkRecordPresentOrUpdate(
+            new DedupRecordInfo(DedupTestUtils.getPrimaryKey(3), 3000), 
segment2));
+    checkRecordLocation(_metadataManager._primaryKeyToSegmentAndTimeMap, 3, 
segment2, hashFunction);
+
+    // Same PK as the one recently ingested
+    Assert.assertTrue(_metadataManager.checkRecordPresentOrUpdate(
+        new DedupRecordInfo(DedupTestUtils.getPrimaryKey(3), 4000), segment2));
+  }
+
+  private static DedupUtils.DedupRecordInfoReader 
generateDedupRecordInfoReader() {
+    PrimaryKeyReader primaryKeyReader = Mockito.mock(PrimaryKeyReader.class);
+    PinotSegmentColumnReader dedupTimeColumnReader = 
Mockito.mock(PinotSegmentColumnReader.class);
+    
Mockito.when(primaryKeyReader.getPrimaryKey(0)).thenReturn(DedupTestUtils.getPrimaryKey(0));
+    
Mockito.when(primaryKeyReader.getPrimaryKey(1)).thenReturn(DedupTestUtils.getPrimaryKey(1));
+    
Mockito.when(primaryKeyReader.getPrimaryKey(2)).thenReturn(DedupTestUtils.getPrimaryKey(2));
+    
Mockito.when(primaryKeyReader.getPrimaryKey(3)).thenReturn(DedupTestUtils.getPrimaryKey(0));
+    
Mockito.when(primaryKeyReader.getPrimaryKey(4)).thenReturn(DedupTestUtils.getPrimaryKey(1));
+    
Mockito.when(primaryKeyReader.getPrimaryKey(5)).thenReturn(DedupTestUtils.getPrimaryKey(0));
+    for (int i = 0; i < 6; i++) {
+      Mockito.when(dedupTimeColumnReader.getValue(i)).thenReturn(i * 1000);
+    }
+    return new DedupUtils.DedupRecordInfoReader(primaryKeyReader, 
dedupTimeColumnReader);
+  }
+
+  private static void checkRecordLocation(Map<Object, Pair<IndexSegment, 
Double>> recordLocationMap, int keyValue,
+      IndexSegment segment, HashFunction hashFunction) {
+    IndexSegment indexSegment =
+        
recordLocationMap.get(HashUtils.hashPrimaryKey(DedupTestUtils.getPrimaryKey(keyValue),
 hashFunction)).getLeft();
+    assertNotNull(indexSegment);
+    assertSame(indexSegment, segment);
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/DedupTestUtils.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/DedupTestUtils.java
new file mode 100644
index 0000000000..13d19fddc6
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/DedupTestUtils.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import org.apache.pinot.common.utils.LLCSegmentName;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class DedupTestUtils {
+  public static final String RAW_TABLE_NAME = "testTable";
+  public static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+
+  private DedupTestUtils() {
+  }
+
+  public static ImmutableSegmentImpl mockSegment(int sequenceNumber, int 
totalDocs) {
+    // Mock the segment name
+    ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
+    String segmentName = getSegmentName(sequenceNumber);
+    when(segment.getSegmentName()).thenReturn(segmentName);
+    // Mock the segment total doc
+    SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+    when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+    when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs);
+    return segment;
+  }
+
+  public static String getSegmentName(int sequenceNumber) {
+    return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber, 
System.currentTimeMillis()).toString();
+  }
+
+  public static PrimaryKey getPrimaryKey(int value) {
+    return new PrimaryKey(new Object[]{value});
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
deleted file mode 100644
index 0fed3d59dd..0000000000
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.dedup;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
-import org.apache.pinot.segment.local.utils.HashUtils;
-import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.spi.config.table.HashFunction;
-import org.apache.pinot.spi.data.readers.PrimaryKey;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertSame;
-
-
-public class PartitionDedupMetadataManagerTest {
-  private static final String RAW_TABLE_NAME = "testTable";
-  private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
-
-  @Test
-  public void verifyAddRemoveSegment() {
-    HashFunction hashFunction = HashFunction.NONE;
-    TestMetadataManager metadataManager =
-        new TestMetadataManager(REALTIME_TABLE_NAME, null, 0, 
mock(ServerMetrics.class), hashFunction);
-    Map<Object, IndexSegment> recordLocationMap = 
metadataManager._primaryKeyToSegmentMap;
-
-    // Add the first segment
-    List<PrimaryKey> pkList1 = new ArrayList<>();
-    pkList1.add(getPrimaryKey(0));
-    pkList1.add(getPrimaryKey(1));
-    pkList1.add(getPrimaryKey(2));
-    pkList1.add(getPrimaryKey(0));
-    pkList1.add(getPrimaryKey(1));
-    pkList1.add(getPrimaryKey(0));
-    metadataManager._primaryKeyIterator = pkList1.iterator();
-    ImmutableSegmentImpl segment1 = mockSegment(1);
-    metadataManager.addSegment(segment1);
-    checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment1, hashFunction);
-
-    metadataManager._primaryKeyIterator = pkList1.iterator();
-    metadataManager.removeSegment(segment1);
-    Assert.assertEquals(recordLocationMap.size(), 0);
-  }
-
-  @Test
-  public void verifyReloadSegment() {
-    HashFunction hashFunction = HashFunction.NONE;
-    TestMetadataManager metadataManager =
-        new TestMetadataManager(REALTIME_TABLE_NAME, null, 0, 
mock(ServerMetrics.class), hashFunction);
-    Map<Object, IndexSegment> recordLocationMap = 
metadataManager._primaryKeyToSegmentMap;
-
-    // Add the first segment
-    List<PrimaryKey> pkList1 = new ArrayList<>();
-    pkList1.add(getPrimaryKey(0));
-    pkList1.add(getPrimaryKey(1));
-    pkList1.add(getPrimaryKey(2));
-    pkList1.add(getPrimaryKey(0));
-    pkList1.add(getPrimaryKey(1));
-    pkList1.add(getPrimaryKey(0));
-    metadataManager._primaryKeyIterator = pkList1.iterator();
-    ImmutableSegmentImpl segment1 = mockSegment(1);
-    metadataManager.addSegment(segment1);
-
-    // Remove another segment with same PK rows
-    metadataManager._primaryKeyIterator = pkList1.iterator();
-    ImmutableSegmentImpl segment2 = mockSegment(1);
-    metadataManager.removeSegment(segment2);
-    Assert.assertEquals(recordLocationMap.size(), 3);
-
-    // Keys should still exist
-    checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment1, hashFunction);
-  }
-
-  @Test
-  public void verifyAddRow() {
-    HashFunction hashFunction = HashFunction.NONE;
-    TestMetadataManager metadataManager =
-        new TestMetadataManager(REALTIME_TABLE_NAME, null, 0, 
mock(ServerMetrics.class), hashFunction);
-    Map<Object, IndexSegment> recordLocationMap = 
metadataManager._primaryKeyToSegmentMap;
-
-    // Add the first segment
-    List<PrimaryKey> pkList1 = new ArrayList<>();
-    pkList1.add(getPrimaryKey(0));
-    pkList1.add(getPrimaryKey(1));
-    pkList1.add(getPrimaryKey(2));
-    pkList1.add(getPrimaryKey(0));
-    pkList1.add(getPrimaryKey(1));
-    pkList1.add(getPrimaryKey(0));
-    metadataManager._primaryKeyIterator = pkList1.iterator();
-    ImmutableSegmentImpl segment1 = mockSegment(1);
-    metadataManager.addSegment(segment1);
-
-    // Same PK exists
-    ImmutableSegmentImpl segment2 = mockSegment(2);
-    
Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(getPrimaryKey(0), 
segment2));
-    checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
-
-    // New PK
-    
Assert.assertFalse(metadataManager.checkRecordPresentOrUpdate(getPrimaryKey(3), 
segment2));
-    checkRecordLocation(recordLocationMap, 3, segment2, hashFunction);
-
-    // Same PK as the one recently ingested
-    
Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(getPrimaryKey(3), 
segment2));
-  }
-
-  private static ImmutableSegmentImpl mockSegment(int sequenceNumber) {
-    ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
-    String segmentName = getSegmentName(sequenceNumber);
-    when(segment.getSegmentName()).thenReturn(segmentName);
-    return segment;
-  }
-
-  private static String getSegmentName(int sequenceNumber) {
-    return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber, 
System.currentTimeMillis()).toString();
-  }
-
-  private static PrimaryKey getPrimaryKey(int value) {
-    return new PrimaryKey(new Object[]{value});
-  }
-
-  private static void checkRecordLocation(Map<Object, IndexSegment> 
recordLocationMap, int keyValue,
-      IndexSegment segment, HashFunction hashFunction) {
-    IndexSegment indexSegment = 
recordLocationMap.get(HashUtils.hashPrimaryKey(getPrimaryKey(keyValue), 
hashFunction));
-    assertNotNull(indexSegment);
-    assertSame(indexSegment, segment);
-  }
-
-  private static class TestMetadataManager extends 
ConcurrentMapPartitionDedupMetadataManager {
-    Iterator<PrimaryKey> _primaryKeyIterator;
-
-    TestMetadataManager(String tableNameWithType, List<String> 
primaryKeyColumns, int partitionId,
-        ServerMetrics serverMetrics, HashFunction hashFunction) {
-      super(tableNameWithType, primaryKeyColumns, partitionId, serverMetrics, 
hashFunction);
-    }
-
-    @Override
-    Iterator<PrimaryKey> getPrimaryKeyIterator(IndexSegment segment) {
-      return _primaryKeyIterator;
-    }
-  }
-}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
index ebc092eb2b..36c5627902 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
@@ -19,9 +19,13 @@
 
 package org.apache.pinot.segment.local.indexsegment.mutable;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.File;
+import java.io.IOException;
 import java.net.URL;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
@@ -30,6 +34,7 @@ import 
org.apache.pinot.segment.local.dedup.TableDedupMetadataManagerFactory;
 import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
 import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
@@ -48,7 +53,7 @@ public class MutableSegmentDedupeTest {
   private static final String DATA_FILE_PATH = "data/test_dedup_data.json";
   private MutableSegmentImpl _mutableSegmentImpl;
 
-  private void setup(boolean dedupEnabled)
+  private void setup(boolean dedupEnabled, double metadataTTL, String 
dedupTimeColumn)
       throws Exception {
     URL schemaResourceUrl = 
this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
     URL dataResourceUrl = 
this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
@@ -57,11 +62,13 @@ public class MutableSegmentDedupeTest {
         .setDedupConfig(new DedupConfig(dedupEnabled, 
HashFunction.NONE)).build();
     CompositeTransformer recordTransformer = 
CompositeTransformer.getDefaultTransformer(tableConfig, schema);
     File jsonFile = new File(dataResourceUrl.getFile());
+    DedupConfig dedupConfig = new DedupConfig(true, HashFunction.NONE, null, 
null, metadataTTL, dedupTimeColumn);
     PartitionDedupMetadataManager partitionDedupMetadataManager =
-        (dedupEnabled) ? 
getTableDedupMetadataManager(schema).getOrCreatePartitionManager(0) : null;
+        (dedupEnabled) ? getTableDedupMetadataManager(schema, 
dedupConfig).getOrCreatePartitionManager(0) : null;
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, 
Collections.emptySet(), Collections.emptySet(),
-            Collections.emptySet(), false, true, null, "secondsSinceEpoch", 
null, partitionDedupMetadataManager);
+            Collections.emptySet(), false, true, null, "secondsSinceEpoch", 
null, dedupConfig,
+            partitionDedupMetadataManager);
     GenericRow reuse = new GenericRow();
     try (RecordReader recordReader = 
RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile,
         schema.getColumnNames(), null)) {
@@ -69,30 +76,100 @@ public class MutableSegmentDedupeTest {
         recordReader.next(reuse);
         GenericRow transformedRow = recordTransformer.transform(reuse);
         _mutableSegmentImpl.index(transformedRow, null);
+        if (dedupEnabled) {
+          partitionDedupMetadataManager.removeExpiredPrimaryKeys();
+        }
         reuse.clear();
       }
     }
   }
 
-  private static TableDedupMetadataManager getTableDedupMetadataManager(Schema 
schema) {
+  private static TableDedupMetadataManager getTableDedupMetadataManager(Schema 
schema, DedupConfig dedupConfig) {
     TableConfig tableConfig = Mockito.mock(TableConfig.class);
     Mockito.when(tableConfig.getTableName()).thenReturn("testTable_REALTIME");
-    Mockito.when(tableConfig.getDedupConfig()).thenReturn(new 
DedupConfig(true, HashFunction.NONE));
-    return TableDedupMetadataManagerFactory.create(tableConfig, schema, 
Mockito.mock(TableDataManager.class),
+    Mockito.when(tableConfig.getDedupConfig()).thenReturn(dedupConfig);
+    SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig
+        = Mockito.mock(SegmentsValidationAndRetentionConfig.class);
+    
Mockito.when(tableConfig.getValidationConfig()).thenReturn(segmentsValidationAndRetentionConfig);
+    
Mockito.when(segmentsValidationAndRetentionConfig.getTimeColumnName()).thenReturn("secondsSinceEpoch");
+    TableDataManager tableDataManager = Mockito.mock(TableDataManager.class);
+    
Mockito.when(tableDataManager.getTableDataDir()).thenReturn(Mockito.mock(File.class));
+    return TableDedupMetadataManagerFactory.create(tableConfig, schema, 
tableDataManager,
         Mockito.mock(ServerMetrics.class));
   }
 
+  public List<Map<String, String>> loadJsonFile(String filePath) throws 
IOException {
+    URL resourceUrl = this.getClass().getClassLoader().getResource(filePath);
+    if (resourceUrl == null) {
+      throw new IllegalArgumentException("File not found: " + filePath);
+    }
+    File jsonFile = new File(resourceUrl.getFile());
+    ObjectMapper objectMapper = new ObjectMapper();
+    return objectMapper.readValue(jsonFile, List.class);
+  }
+
   @Test
   public void testDedupeEnabled()
       throws Exception {
-    setup(true);
+    setup(true, 0, null);
     Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 2);
+    List<Map<String, String>> rawData = loadJsonFile(DATA_FILE_PATH);
+    for (int i = 0; i < 2; i++) {
+      verifyGeneratedSegmentDataAgainstRawData(i, i, rawData);
+    }
   }
 
   @Test
   public void testDedupeDisabled()
       throws Exception {
-    setup(false);
+    setup(false, 0, null);
     Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 4);
+    List<Map<String, String>> rawData = loadJsonFile(DATA_FILE_PATH);
+    for (int i = 0; i < 4; i++) {
+      verifyGeneratedSegmentDataAgainstRawData(i, i, rawData);
+    }
+  }
+
+  @Test
+  public void testDedupWithMetadataTTLWithoutDedupTimeColumn()
+      throws Exception {
+    setup(true, 1000, null);
+    checkGeneratedSegmentDataWhenTableTimeColumnIsUsedAsDedupTimeColumn();
+  }
+
+  @Test
+  public void testDedupWithMetadataTTLWithTableTimeColumn()
+      throws Exception {
+    setup(true, 1000, "secondsSinceEpoch");
+    checkGeneratedSegmentDataWhenTableTimeColumnIsUsedAsDedupTimeColumn();
+  }
+
+  private void 
checkGeneratedSegmentDataWhenTableTimeColumnIsUsedAsDedupTimeColumn()
+      throws IOException {
+    Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 3);
+    List<Map<String, String>> rawData = loadJsonFile(DATA_FILE_PATH);
+    for (int i = 0; i < 2; i++) {
+      verifyGeneratedSegmentDataAgainstRawData(i, i, rawData);
+    }
+    verifyGeneratedSegmentDataAgainstRawData(2, 3, rawData);
+  }
+
+  @Test
+  public void testDedupWithMetadataTTLWithDedupTimeColumn()
+      throws Exception {
+    setup(true, 1000, "dedupTime");
+    Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 2);
+    List<Map<String, String>> rawData = loadJsonFile(DATA_FILE_PATH);
+    for (int i = 0; i < 2; i++) {
+      verifyGeneratedSegmentDataAgainstRawData(i, i, rawData);
+    }
+  }
+
+  private void verifyGeneratedSegmentDataAgainstRawData(
+      int docId, int rawDataIndex, List<Map<String, String>> rawData) {
+    for (String columnName : rawData.get(0).keySet()) {
+      Assert.assertEquals(String.valueOf(_mutableSegmentImpl.getValue(docId, 
columnName)),
+          String.valueOf(rawData.get(rawDataIndex).get(columnName)));
+    }
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
index 6a8ca27480..b23f203ec7 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -34,6 +34,7 @@ import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -58,7 +59,7 @@ public class MutableSegmentImplTestUtils {
       Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns,
       List<AggregationConfig> preAggregationConfigs) {
     return createMutableSegmentImpl(schema, noDictionaryColumns, 
varLengthDictionaryColumns, invertedIndexColumns,
-        Collections.emptyMap(), false, false, null, null, null, null, null, 
preAggregationConfigs);
+        Collections.emptyMap(), false, false, null, null, null, null, null, 
null, preAggregationConfigs);
   }
 
   public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, 
Set<String> noDictionaryColumns,
@@ -71,31 +72,31 @@ public class MutableSegmentImplTestUtils {
       Set<String> varLengthDictionaryColumns, Set<String> 
invertedIndexColumns, boolean aggregateMetrics,
       boolean nullHandlingEnabled) {
     return createMutableSegmentImpl(schema, noDictionaryColumns, 
varLengthDictionaryColumns, invertedIndexColumns,
-        aggregateMetrics, nullHandlingEnabled, null, null, null, null);
+        aggregateMetrics, nullHandlingEnabled, null, null, null, null, null);
   }
 
   public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, 
Set<String> noDictionaryColumns,
       Set<String> varLengthDictionaryColumns, Set<String> 
invertedIndexColumns, boolean aggregateMetrics,
       boolean nullHandlingEnabled, UpsertConfig upsertConfig, String 
timeColumnName,
-      PartitionUpsertMetadataManager partitionUpsertMetadataManager,
+      PartitionUpsertMetadataManager partitionUpsertMetadataManager, 
DedupConfig dedupConfig,
       PartitionDedupMetadataManager partitionDedupMetadataManager) {
     return createMutableSegmentImpl(schema, noDictionaryColumns, 
varLengthDictionaryColumns, invertedIndexColumns,
         Collections.emptyMap(), aggregateMetrics, nullHandlingEnabled, 
upsertConfig, timeColumnName,
-        partitionUpsertMetadataManager, partitionDedupMetadataManager, null, 
Collections.emptyList());
+        partitionUpsertMetadataManager, dedupConfig, 
partitionDedupMetadataManager, null, Collections.emptyList());
   }
 
   public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, 
Set<String> noDictionaryColumns,
       Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns,
       Map<String, JsonIndexConfig> jsonIndexConfigs, ServerMetrics 
serverMetrics) {
     return createMutableSegmentImpl(schema, noDictionaryColumns, 
varLengthDictionaryColumns, invertedIndexColumns,
-        jsonIndexConfigs, false, true, null, null, null, null, serverMetrics, 
Collections.emptyList());
+        jsonIndexConfigs, false, true, null, null, null, null, null, 
serverMetrics, Collections.emptyList());
   }
 
   public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, 
Set<String> noDictionaryColumns,
       Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns,
       Map<String, JsonIndexConfig> jsonIndexConfigs, boolean aggregateMetrics, 
boolean nullHandlingEnabled,
       UpsertConfig upsertConfig, String timeColumnName, 
PartitionUpsertMetadataManager partitionUpsertMetadataManager,
-      PartitionDedupMetadataManager partitionDedupMetadataManager, 
ServerMetrics serverMetrics,
+      DedupConfig dedupConfig, PartitionDedupMetadataManager 
partitionDedupMetadataManager, ServerMetrics serverMetrics,
       List<AggregationConfig> aggregationConfigs) {
 
     RealtimeSegmentStatsHistory statsHistory = 
mock(RealtimeSegmentStatsHistory.class);
@@ -106,6 +107,7 @@ public class MutableSegmentImplTestUtils {
     List<String> comparisonColumns = upsertConfig == null ? null : 
upsertConfig.getComparisonColumns();
     boolean isUpsertDropOutOfOrderRecord = upsertConfig == null ? false : 
upsertConfig.isDropOutOfOrderRecord();
     String upsertOutOfOrderRecordColumn = upsertConfig == null ? null : 
upsertConfig.getOutOfOrderRecordColumn();
+    String dedupTimeColumn = dedupConfig == null ? null : 
dedupConfig.getDedupTimeColumn();
     DictionaryIndexConfig varLengthDictConf = new DictionaryIndexConfig(false, 
true);
     RealtimeSegmentConfig.Builder segmentConfBuilder = new 
RealtimeSegmentConfig.Builder()
         
.setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
@@ -119,10 +121,11 @@ public class MutableSegmentImplTestUtils {
         
.setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode)
         .setUpsertComparisonColumns(comparisonColumns)
         .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
-        .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
         .setIngestionAggregationConfigs(aggregationConfigs)
         .setUpsertDropOutOfOrderRecord(isUpsertDropOutOfOrderRecord)
         .setUpsertOutOfOrderRecordColumn(upsertOutOfOrderRecordColumn)
+        .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
+        .setDedupTimeColumn(dedupTimeColumn)
         .setConsumerDir(TEMP_DIR.getAbsolutePath() + "/" + UUID.randomUUID() + 
"/consumerDir");
     for (Map.Entry<String, JsonIndexConfig> entry : 
jsonIndexConfigs.entrySet()) {
       segmentConfBuilder.setIndex(entry.getKey(), StandardIndexes.json(), 
entry.getValue());
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index b7a73b238f..73ea233607 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -91,7 +91,7 @@ public class MutableSegmentImplUpsertComparisonColTest {
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, 
Collections.emptySet(), Collections.emptySet(),
             Collections.emptySet(), false, true, upsertConfig, 
"secondsSinceEpoch", _partitionUpsertMetadataManager,
-            null);
+            null, null);
     GenericRow reuse = new GenericRow();
     try (RecordReader recordReader = 
RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile,
         _schema.getColumnNames(), null)) {
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index cdc2fdbf77..4f46bd6d59 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -101,7 +101,7 @@ public class MutableSegmentImplUpsertTest {
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, 
Collections.emptySet(), Collections.emptySet(),
             Collections.emptySet(), false, true, upsertConfigWithHash, 
"secondsSinceEpoch",
-            _partitionUpsertMetadataManager, null);
+            _partitionUpsertMetadataManager, null, null);
 
     GenericRow reuse = new GenericRow();
     try (RecordReader recordReader = 
RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile,
diff --git a/pinot-segment-local/src/test/resources/data/test_dedup_data.json 
b/pinot-segment-local/src/test/resources/data/test_dedup_data.json
index 11f9050f53..dfc7ad0889 100644
--- a/pinot-segment-local/src/test/resources/data/test_dedup_data.json
+++ b/pinot-segment-local/src/test/resources/data/test_dedup_data.json
@@ -2,21 +2,25 @@
   {
     "event_id": "aa",
     "description" : "first",
-    "secondsSinceEpoch": 1567205394
+    "dedupTime": 5000,
+    "secondsSinceEpoch": 5000
   },
   {
     "event_id": "bb",
     "description" : "first",
-    "secondsSinceEpoch": 1567205396
+    "dedupTime": 6000,
+    "secondsSinceEpoch": 1000
   },
   {
     "event_id": "aa",
     "description" : "second",
-    "secondsSinceEpoch": 1567205397
+    "dedupTime": 3000,
+    "secondsSinceEpoch": 5500
   },
   {
     "event_id": "bb",
     "description" : "second",
-    "secondsSinceEpoch": 1567205392
+    "dedupTime": 1000,
+    "secondsSinceEpoch": 7000
   }
 ]
diff --git a/pinot-segment-local/src/test/resources/data/test_dedup_schema.json 
b/pinot-segment-local/src/test/resources/data/test_dedup_schema.json
index 859c6f70c7..8c8b615916 100644
--- a/pinot-segment-local/src/test/resources/data/test_dedup_schema.json
+++ b/pinot-segment-local/src/test/resources/data/test_dedup_schema.json
@@ -8,6 +8,10 @@
     {
       "name": "description",
       "dataType": "STRING"
+    },
+    {
+      "name": "dedupTime",
+      "dataType": "LONG"
     }
   ],
   "timeFieldSpec": {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
index 17ff1e271a..78d7b3b9f0 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
@@ -20,25 +20,47 @@ package org.apache.pinot.spi.config.table;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.Map;
 import org.apache.pinot.spi.config.BaseJsonConfig;
 
 public class DedupConfig extends BaseJsonConfig {
+  @JsonPropertyDescription("Whether dedup is enabled or not.")
   private final boolean _dedupEnabled;
+  @JsonPropertyDescription("Function to hash the primary key.")
   private final HashFunction _hashFunction;
+  @JsonPropertyDescription("Custom class for dedup metadata manager. If not 
specified, the default implementation "
+      + "ConcurrentMapTableDedupMetadataManager will be used.")
   private final String _metadataManagerClass;
+  @JsonPropertyDescription("Custom configs for dedup metadata manager.")
+  private final Map<String, String> _metadataManagerConfigs;
+  @JsonPropertyDescription("When larger than 0, use it for dedup metadata 
cleanup, it uses the same unit as the "
+      + "dedup time column. The metadata will be cleaned up when the dedup 
time is older than the current time "
+      + "minus metadata TTL. Notice that the metadata may not be cleaned up 
immediately after the TTL, it depends on "
+      + "the cleanup schedule.")
+  private final double _metadataTTL;
+  @JsonPropertyDescription("Time column used to calculate dedup metadata TTL. 
When it is not specified, the time column"
+      + " from the table config will be used.")
+  private final String _dedupTimeColumn;
 
   public DedupConfig(@JsonProperty(value = "dedupEnabled", required = true) 
boolean dedupEnabled,
       @JsonProperty(value = "hashFunction") HashFunction hashFunction) {
-    this(dedupEnabled, hashFunction, null);
+    this(dedupEnabled, hashFunction, null, null, 0, null);
   }
+
   @JsonCreator
   public DedupConfig(@JsonProperty(value = "dedupEnabled", required = true) 
boolean dedupEnabled,
       @JsonProperty(value = "hashFunction") HashFunction hashFunction,
-      @JsonProperty(value = "metadataManagerClass") String metadataManagerClass
-  ) {
+      @JsonProperty(value = "metadataManagerClass") String 
metadataManagerClass,
+      @JsonProperty(value = "metadataManagerConfigs") Map<String, String> 
metadataManagerConfigs,
+      @JsonProperty(value = "metadataTTL") double metadataTTL,
+      @JsonProperty(value = "dedupTimeColumn") String dedupTimeColumn) {
     _dedupEnabled = dedupEnabled;
     _hashFunction = hashFunction == null ? HashFunction.NONE : hashFunction;
     _metadataManagerClass = metadataManagerClass;
+    _metadataManagerConfigs = metadataManagerConfigs;
+    _metadataTTL = metadataTTL;
+    _dedupTimeColumn = dedupTimeColumn;
   }
 
   public HashFunction getHashFunction() {
@@ -52,4 +74,16 @@ public class DedupConfig extends BaseJsonConfig {
   public String getMetadataManagerClass() {
     return _metadataManagerClass;
   }
+
+  public Map<String, String> getMetadataManagerConfigs() {
+    return _metadataManagerConfigs;
+  }
+
+  public double getMetadataTTL() {
+    return _metadataTTL;
+  }
+
+  public String getDedupTimeColumn() {
+    return _dedupTimeColumn;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index b374855d59..90338cc7af 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -394,6 +394,16 @@ public class TableConfig extends BaseJsonConfig {
     return _upsertConfig == null ? 0 : _upsertConfig.getMetadataTTL();
   }
 
+  @JsonIgnore
+  public String getDedupTimeColumn() {
+    return _dedupConfig == null ? null : _dedupConfig.getDedupTimeColumn();
+  }
+
+  @JsonIgnore
+  public double getDedupMetadataTTL() {
+    return _dedupConfig == null ? 0 : _dedupConfig.getMetadataTTL();
+  }
+
   @JsonIgnore
   @Nullable
   public String getUpsertDeleteRecordColumn() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to