This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 50ad111437d Add upsert support for offline tables (#17789)
50ad111437d is described below

commit 50ad111437dc1f3c913e1900ae24d28ce21ffe96
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Mar 7 22:36:50 2026 -0800

    Add upsert support for offline tables (#17789)
    
    Enable primary-key-based deduplication (upsert) for OFFLINE tables,
    extending a capability previously limited to REALTIME tables. This
    allows batch-ingested data to leverage the same upsert semantics.
    
    Core changes:
    - Add three-level comparison column fallback for resolving record
      conflicts: configured comparison columns → time column → segment
      creation/push time (via ConstantComparisonColumnReader)
    - Extend OfflineTableDataManager with full upsert lifecycle: init,
      addSegment, replaceSegment, getSegmentContexts, and shutdown
    - Validate that offline upsert tables have SegmentPartitionConfig
      to ensure partition-aware segment assignment
    - Use zkMetadata-based partition ID lookup when available to avoid
      redundant ZK reads
    - Guard TTL code paths against empty comparison columns to prevent
      IndexOutOfBoundsException in segment-creation-time mode
    
    Refactoring:
    - Move shared upsert methods (handleUpsert, replaceUpsertSegment,
      registerSegment, setZkOperationTimeIfAvailable, getSegmentContexts,
      isUpsertEnabled, getPartitionToPrimaryKeyCount) from
      RealtimeTableDataManager into BaseTableDataManager
    - Add isUpsertEnabled(), getTableUpsertMetadataManager(), and
      getPartitionToPrimaryKeyCount() to the TableDataManager interface
      with default implementations
    - Remove instanceof checks in PrimaryKeyCount, TablesResource, and
      SingleTableExecutionInfo in favor of interface methods
    
    Testing:
    - Add OfflineUpsertTableIntegrationTest covering dedup query results,
      skipUpsert option, and segment replacement
    - Update TableConfigUtilsTest for new validation rules
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../core/data/manager/BaseTableDataManager.java    | 133 +++++++++
 .../manager/offline/OfflineTableDataManager.java   |  39 +++
 .../manager/realtime/RealtimeTableDataManager.java | 133 +--------
 .../query/executor/SingleTableExecutionInfo.java   |  22 +-
 .../tests/OfflineUpsertTableIntegrationTest.java   | 306 +++++++++++++++++++++
 .../local/data/manager/TableDataManager.java       |  24 ++
 .../upsert/BasePartitionUpsertMetadataManager.java |  51 ++--
 .../upsert/BaseTableUpsertMetadataManager.java     |   9 +-
 ...oncurrentMapPartitionUpsertMetadataManager.java |   4 +-
 ...nUpsertMetadataManagerForConsistentDeletes.java |   4 +-
 .../pinot/segment/local/upsert/UpsertContext.java  |   2 +-
 .../pinot/segment/local/upsert/UpsertUtils.java    |  36 +++
 .../segment/local/utils/TableConfigUtils.java      |  40 ++-
 .../segment/local/utils/TableConfigUtilsTest.java  |   9 +-
 .../spi/index/metadata/SegmentMetadataImpl.java    |  26 +-
 .../server/api/resources/PrimaryKeyCount.java      |  12 +-
 .../pinot/server/api/resources/TablesResource.java |   8 +-
 17 files changed, 656 insertions(+), 202 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 7458ca9d231..07dd67d376a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -60,7 +60,9 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.common.utils.ExceptionUtils;
+import org.apache.pinot.common.utils.SegmentUtils;
 import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.common.utils.config.TierConfigUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
@@ -78,6 +80,8 @@ import 
org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import 
org.apache.pinot.segment.local.segment.index.loader.invertedindex.MultiColumnTextIndexHandler;
 import org.apache.pinot.segment.local.startree.StarTreeBuilderUtils;
 import 
org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig;
+import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
 import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
 import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
@@ -107,9 +111,11 @@ import 
org.apache.pinot.spi.config.table.MultiColumnTextIndexConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,6 +128,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
 
   protected final ConcurrentHashMap<String, SegmentDataManager> 
_segmentDataManagerMap = new ConcurrentHashMap<>();
   protected final ServerMetrics _serverMetrics = ServerMetrics.get();
+  protected TableUpsertMetadataManager _tableUpsertMetadataManager;
 
   protected InstanceDataManagerConfig _instanceDataManagerConfig;
   protected String _instanceId;
@@ -784,9 +791,135 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       Map<String, String> queryOptions) {
     List<SegmentContext> segmentContexts = new 
ArrayList<>(selectedSegments.size());
     selectedSegments.forEach(s -> segmentContexts.add(new SegmentContext(s)));
+    if (isUpsertEnabled() && !QueryOptionsUtils.isSkipUpsert(queryOptions)) {
+      _tableUpsertMetadataManager.setSegmentContexts(segmentContexts, 
queryOptions);
+    }
     return segmentContexts;
   }
 
+  @Override
+  public boolean isUpsertEnabled() {
+    return _tableUpsertMetadataManager != null;
+  }
+
+  @VisibleForTesting
+  @Override
+  public TableUpsertMetadataManager getTableUpsertMetadataManager() {
+    return _tableUpsertMetadataManager;
+  }
+
+  @Override
+  public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
+    if (isUpsertEnabled()) {
+      return _tableUpsertMetadataManager.getPartitionToPrimaryKeyCount();
+    }
+    return Collections.emptyMap();
+  }
+
+  protected void handleUpsert(ImmutableSegment immutableSegment, @Nullable 
SegmentZKMetadata zkMetadata) {
+    String segmentName = immutableSegment.getSegmentName();
+    _logger.info("Adding immutable segment: {} with upsert enabled", 
segmentName);
+    Integer partitionId;
+    if (zkMetadata == null && 
TableNameBuilder.isOfflineTableResource(_tableNameWithType)) {
+      zkMetadata = 
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
_tableNameWithType,
+          segmentName);
+    }
+    setZkOperationTimeIfAvailable(immutableSegment, zkMetadata);
+    if (TableNameBuilder.isOfflineTableResource(_tableNameWithType)) {
+      Preconditions.checkState(zkMetadata != null,
+          "Failed to find segment ZK metadata for segment: %s of OFFLINE 
table: %s", segmentName, _tableNameWithType);
+      partitionId = SegmentUtils.getSegmentPartitionId(zkMetadata, null);
+    } else {
+      partitionId = SegmentUtils.getSegmentPartitionId(segmentName, 
_tableNameWithType, _helixManager, null);
+    }
+
+    Preconditions.checkNotNull(partitionId,
+        "Failed to get partition id for segment: %s (upsert-enabled table: 
%s). "
+            + "Segment must follow a naming convention that encodes partition 
id (e.g. LLCSegmentName, "
+            + "UploadedRealtimeSegmentName), or have partition metadata 
configured via SegmentPartitionConfig.",
+        segmentName, _tableNameWithType);
+    PartitionUpsertMetadataManager partitionUpsertMetadataManager =
+        _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId);
+
+    _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.DOCUMENT_COUNT,
+        immutableSegment.getSegmentMetadata().getTotalDocs());
+    _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.SEGMENT_COUNT, 1L);
+    ImmutableSegmentDataManager newSegmentManager = new 
ImmutableSegmentDataManager(immutableSegment);
+    if (partitionUpsertMetadataManager.isPreloading()) {
+      // Register segment after it is preloaded and has initialized its 
validDocIds. The order of preloading and
+      // registering segment doesn't matter much as preloading happens before 
the table partition is ready for queries.
+      partitionUpsertMetadataManager.preloadSegment(immutableSegment);
+      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
+      _logger.info("Preloaded immutable segment: {} with upsert enabled", 
segmentName);
+      return;
+    }
+    SegmentDataManager oldSegmentManager = 
_segmentDataManagerMap.get(segmentName);
+    if (oldSegmentManager == null) {
+      // When adding a new segment, register it *before* 
partitionUpsertMetadataManager.addSegment() fully initializes
+      // the validDocIds bitmap. This lets queries access the new segment 
immediately while the bitmap is being built.
+      // Without early registration, docs in existing segments that get 
invalidated by this new segment would become
+      // invisible to queries until addSegment() completes.
+      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
+      partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName);
+      partitionUpsertMetadataManager.addSegment(immutableSegment);
+      _logger.info("Added new immutable segment: {} with upsert enabled", 
segmentName);
+    } else {
+      replaceUpsertSegment(segmentName, oldSegmentManager, newSegmentManager, 
partitionUpsertMetadataManager);
+    }
+  }
+
+  protected void replaceUpsertSegment(String segmentName, SegmentDataManager 
oldSegmentManager,
+      ImmutableSegmentDataManager newSegmentManager, 
PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
+    IndexSegment oldSegment = oldSegmentManager.getSegment();
+    ImmutableSegment immutableSegment = newSegmentManager.getSegment();
+    UpsertConfig.ConsistencyMode consistencyMode = 
_tableUpsertMetadataManager.getContext().getConsistencyMode();
+    if (consistencyMode == UpsertConfig.ConsistencyMode.NONE) {
+      // When replacing a segment, register the new segment *after* 
replaceSegment() finishes filling its validDocIds
+      // bitmap. Otherwise queries lose access to valid docs in the old 
segment before the new bitmap is ready.
+      partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
+      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
+    } else {
+      // For consistency modes, keep both old and new segments visible to 
queries during replacement so that queries
+      // can see the new updates in the new segment while the old segment's 
validDocIds are still being updated.
+      // Register the new segment to the upsert metadata manager before making 
it visible to queries so the upsert
+      // view is updated before any query can access it.
+      SegmentDataManager duoSegmentDataManager = new 
DuoSegmentDataManager(newSegmentManager, oldSegmentManager);
+      registerSegment(segmentName, duoSegmentDataManager, 
partitionUpsertMetadataManager);
+      partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
+      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
+    }
+    _logger.info("Replaced {} segment: {} with upsert enabled and consistency 
mode: {}",
+        oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName, consistencyMode);
+    oldSegmentManager.offload();
+    releaseSegment(oldSegmentManager);
+  }
+
+  protected void registerSegment(String segmentName, SegmentDataManager 
segmentDataManager,
+      @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) 
{
+    if (partitionUpsertMetadataManager != null) {
+      
partitionUpsertMetadataManager.trackSegmentForUpsertView(segmentDataManager.getSegment());
+    }
+    registerSegment(segmentName, segmentDataManager);
+  }
+
+  protected void setZkOperationTimeIfAvailable(ImmutableSegment segment, 
@Nullable SegmentZKMetadata zkMetadata) {
+    if (zkMetadata == null) {
+      return;
+    }
+    SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+    if (segmentMetadata instanceof SegmentMetadataImpl) {
+      SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl) 
segmentMetadata;
+      if (zkMetadata.getCreationTime() > 0) {
+        segmentMetadataImpl.setZkCreationTime(zkMetadata.getCreationTime());
+      }
+      if (zkMetadata.getPushTime() > 0) {
+        segmentMetadataImpl.setZkPushTime(zkMetadata.getPushTime());
+      }
+      _logger.info("Set ZK creation time: {}, push time: {} for segment: {} in 
upsert table",
+          zkMetadata.getCreationTime(), zkMetadata.getPushTime(), 
zkMetadata.getSegmentName());
+    }
+  }
+
   private void reloadSegments(List<SegmentDataManager> segmentDataManagers, 
IndexLoadingConfig indexLoadingConfig,
       boolean forceDownload, String reloadJobId)
       throws Exception {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
index a99712ec3bb..23105d54a38 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
@@ -18,11 +18,19 @@
  */
 package org.apache.pinot.core.data.manager.offline;
 
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.core.data.manager.BaseTableDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 
 
 /**
@@ -33,6 +41,14 @@ public class OfflineTableDataManager extends 
BaseTableDataManager {
 
   @Override
   protected void doInit() {
+    Pair<TableConfig, Schema> tableConfigAndSchema = 
getCachedTableConfigAndSchema();
+    TableConfig tableConfig = tableConfigAndSchema.getLeft();
+    Schema schema = tableConfigAndSchema.getRight();
+    if (tableConfig.isUpsertEnabled()) {
+      _tableUpsertMetadataManager =
+          
TableUpsertMetadataManagerFactory.create(_instanceDataManagerConfig.getUpsertConfig(),
 tableConfig, schema,
+              this, _segmentOperationsThrottlerSet);
+    }
   }
 
   @Override
@@ -41,7 +57,17 @@ public class OfflineTableDataManager extends 
BaseTableDataManager {
 
   @Override
   protected void doShutdown() {
+    if (_tableUpsertMetadataManager != null) {
+      _tableUpsertMetadataManager.stop();
+    }
     releaseAndRemoveAllSegments();
+    if (_tableUpsertMetadataManager != null) {
+      try {
+        _tableUpsertMetadataManager.close();
+      } catch (IOException e) {
+        _logger.warn("Caught exception while closing upsert metadata manager", 
e);
+      }
+    }
   }
 
   protected void doAddOnlineSegment(String segmentName)
@@ -57,6 +83,19 @@ public class OfflineTableDataManager extends 
BaseTableDataManager {
     }
   }
 
+  @Override
+  public void addSegment(ImmutableSegment immutableSegment, @Nullable 
SegmentZKMetadata zkMetadata) {
+    String segmentName = immutableSegment.getSegmentName();
+    Preconditions.checkState(!_shutDown,
+        "Table data manager is already shut down, cannot add segment: %s to 
table: %s",
+        segmentName, _tableNameWithType);
+    if (isUpsertEnabled()) {
+      handleUpsert(immutableSegment, zkMetadata);
+      return;
+    }
+    super.addSegment(immutableSegment, zkMetadata);
+  }
+
   @Override
   public void addConsumingSegment(String segmentName) {
     throw new UnsupportedOperationException("Cannot add CONSUMING segment to 
OFFLINE table");
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index d4520a74284..3b606ebb949 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -53,10 +53,7 @@ import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.SegmentUtils;
-import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.data.manager.BaseTableDataManager;
-import org.apache.pinot.core.data.manager.DuoSegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.util.PeerServerSegmentFinder;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
@@ -68,15 +65,11 @@ import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import 
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
-import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
 import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
 import org.apache.pinot.segment.local.utils.SchemaUtils;
 import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.SegmentContext;
-import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -150,7 +143,6 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   private IngestionDelayTracker _ingestionDelayTracker;
 
   private TableDedupMetadataManager _tableDedupMetadataManager;
-  private TableUpsertMetadataManager _tableUpsertMetadataManager;
   private BooleanSupplier _isTableReadyToConsumeData;
   private boolean _enforceConsumptionInOrder = false;
 
@@ -372,17 +364,6 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     _ingestionDelayTracker.stopTrackingPartition(segmentName);
   }
 
-  @Override
-  public List<SegmentContext> getSegmentContexts(List<IndexSegment> 
selectedSegments,
-      Map<String, String> queryOptions) {
-    List<SegmentContext> segmentContexts = new 
ArrayList<>(selectedSegments.size());
-    selectedSegments.forEach(s -> segmentContexts.add(new SegmentContext(s)));
-    if (isUpsertEnabled() && !QueryOptionsUtils.isSkipUpsert(queryOptions)) {
-      _tableUpsertMetadataManager.setSegmentContexts(segmentContexts, 
queryOptions);
-    }
-    return segmentContexts;
-  }
-
   /**
    *  Returns thread safe StreamMetadataProvider which is shared across 
different callers.
    */
@@ -450,10 +431,6 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     return _tableDedupMetadataManager != null;
   }
 
-  public boolean isUpsertEnabled() {
-    return _tableUpsertMetadataManager != null;
-  }
-
   public boolean isPartialUpsertEnabled() {
     return _tableUpsertMetadataManager != null
         && _tableUpsertMetadataManager.getContext().getUpsertMode() == 
UpsertConfig.Mode.PARTIAL;
@@ -779,104 +756,6 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     }
   }
 
-  private void handleUpsert(ImmutableSegment immutableSegment, @Nullable 
SegmentZKMetadata zkMetadata) {
-    String segmentName = immutableSegment.getSegmentName();
-    _logger.info("Adding immutable segment: {} with upsert enabled", 
segmentName);
-
-    // Set the ZK creation time so that same creation time can be used to 
break the comparison ties across replicas,
-    // to ensure data consistency of replica.
-    setZkCreationTimeIfAvailable(immutableSegment, zkMetadata);
-
-    Integer partitionId = SegmentUtils.getSegmentPartitionId(segmentName, 
_tableNameWithType, _helixManager, null);
-    Preconditions.checkNotNull(partitionId, "Failed to get partition id for 
segment: " + segmentName
-        + " (upsert-enabled table: " + _tableNameWithType + ")");
-    PartitionUpsertMetadataManager partitionUpsertMetadataManager =
-        _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId);
-
-    _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.DOCUMENT_COUNT,
-        immutableSegment.getSegmentMetadata().getTotalDocs());
-    _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.SEGMENT_COUNT, 1L);
-    ImmutableSegmentDataManager newSegmentManager = new 
ImmutableSegmentDataManager(immutableSegment);
-    if (partitionUpsertMetadataManager.isPreloading()) {
-      // Register segment after it is preloaded and has initialized its 
validDocIds. The order of preloading and
-      // registering segment doesn't matter much as preloading happens before 
table partition is ready for queries.
-      partitionUpsertMetadataManager.preloadSegment(immutableSegment);
-      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
-      _logger.info("Preloaded immutable segment: {} with upsert enabled", 
segmentName);
-      return;
-    }
-    SegmentDataManager oldSegmentManager = 
_segmentDataManagerMap.get(segmentName);
-    if (oldSegmentManager == null) {
-      // When adding a new segment, we should register it 'before' it is fully 
initialized by
-      // partitionUpsertMetadataManager. Because when processing docs in the 
new segment, the docs in the other
-      // segments may be invalidated, making the queries see less valid docs 
than expected. We should let query
-      // access the new segment asap even though its validDocId bitmap is 
still being filled by
-      // partitionUpsertMetadataManager.
-      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
-      partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName);
-      partitionUpsertMetadataManager.addSegment(immutableSegment);
-      _logger.info("Added new immutable segment: {} with upsert enabled", 
segmentName);
-    } else {
-      replaceUpsertSegment(segmentName, oldSegmentManager, newSegmentManager, 
partitionUpsertMetadataManager);
-    }
-  }
-
-  private void replaceUpsertSegment(String segmentName, SegmentDataManager 
oldSegmentManager,
-      ImmutableSegmentDataManager newSegmentManager, 
PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
-    // When replacing a segment, we should register the new segment 'after' it 
is fully initialized by
-    // partitionUpsertMetadataManager to fill up its validDocId bitmap. 
Otherwise, the queries will lose the access
-    // to the valid docs in the old segment immediately, but the validDocId 
bitmap of the new segment is still
-    // being filled by partitionUpsertMetadataManager, making the queries see 
less valid docs than expected.
-    IndexSegment oldSegment = oldSegmentManager.getSegment();
-    ImmutableSegment immutableSegment = newSegmentManager.getSegment();
-    UpsertConfig.ConsistencyMode consistencyMode = 
_tableUpsertMetadataManager.getContext().getConsistencyMode();
-    if (consistencyMode == UpsertConfig.ConsistencyMode.NONE) {
-      partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
-      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
-    } else {
-      // By default, when replacing a segment, the old segment is kept intact 
and visible to query until the new
-      // segment is registered as in the if-branch above. But the newly 
ingested records will invalidate valid
-      // docs in the new segment as the upsert metadata gets updated during 
replacement, so the query will miss the
-      // new updates in the new segment, until it's registered after the 
replacement is done.
-      // For consistent data view, we make both old and new segment visible to 
the query and update both in place
-      // when segment replacement and new data ingestion are happening in 
parallel.
-      SegmentDataManager duoSegmentDataManager = new 
DuoSegmentDataManager(newSegmentManager, oldSegmentManager);
-      registerSegment(segmentName, duoSegmentDataManager, 
partitionUpsertMetadataManager);
-      partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
-      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
-    }
-    _logger.info("Replaced {} segment: {} with upsert enabled and consistency 
mode: {}",
-        oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName, consistencyMode);
-    oldSegmentManager.offload();
-    releaseSegment(oldSegmentManager);
-  }
-
-  private void registerSegment(String segmentName, SegmentDataManager 
segmentDataManager,
-      @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) 
{
-    if (partitionUpsertMetadataManager != null) {
-      // Register segment to the upsert metadata manager before registering it 
to table manager, so that the upsert
-      // metadata manger can update the upsert view before the segment becomes 
visible to queries.
-      
partitionUpsertMetadataManager.trackSegmentForUpsertView(segmentDataManager.getSegment());
-    }
-    registerSegment(segmentName, segmentDataManager);
-  }
-
-  /**
-   * Sets the ZK creation time in the segment metadata if available, to ensure 
consistent
-   * creation times across replicas for upsert operations.
-   */
-  private void setZkCreationTimeIfAvailable(ImmutableSegment segment, 
@Nullable SegmentZKMetadata zkMetadata) {
-    if (zkMetadata != null && zkMetadata.getCreationTime() > 0) {
-      SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
-      if (segmentMetadata instanceof SegmentMetadataImpl) {
-        SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl) 
segmentMetadata;
-        segmentMetadataImpl.setZkCreationTime(zkMetadata.getCreationTime());
-        _logger.info("Set ZK creation time {} for segment: {} in upsert 
table", zkMetadata.getCreationTime(),
-            zkMetadata.getSegmentName());
-      }
-    }
-  }
-
   /**
    * Replaces the CONSUMING segment with a downloaded committed one.
    */
@@ -925,22 +804,12 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     return _instanceId;
   }
 
-  @VisibleForTesting
-  public TableUpsertMetadataManager getTableUpsertMetadataManager() {
-    return _tableUpsertMetadataManager;
-  }
-
   @VisibleForTesting
   public TableDedupMetadataManager getTableDedupMetadataManager() {
     return _tableDedupMetadataManager;
   }
 
-  /**
-   * Retrieves a mapping of partition id to the primary key count for the 
partition.
-   * Supports both upsert and dedup enabled tables.
-   *
-   * @return A {@code Map} where keys are partition id and values are count of 
primary keys for that specific partition.
-   */
+  @Override
   public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
     if (isUpsertEnabled()) {
       return _tableUpsertMetadataManager.getPartitionToPrimaryKeyCount();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
index aba2be49adc..5c0949f017f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.query.executor;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -74,17 +75,18 @@ public class SingleTableExecutionInfo implements 
TableExecutionInfo {
     List<IndexSegment> indexSegments;
     Map<IndexSegment, SegmentContext> providedSegmentContexts = null;
 
-    if (!isUpsertTable(tableDataManager)) {
+    if (!tableDataManager.isUpsertEnabled()) {
       segmentDataManagers = tableDataManager.acquireSegments(segmentsToQuery, 
optionalSegments, notAcquiredSegments);
       indexSegments = new ArrayList<>(segmentDataManagers.size());
       for (SegmentDataManager segmentDataManager : segmentDataManagers) {
         indexSegments.add(segmentDataManager.getSegment());
       }
     } else {
-      RealtimeTableDataManager rtdm = (RealtimeTableDataManager) 
tableDataManager;
-      TableUpsertMetadataManager tumm = rtdm.getTableUpsertMetadataManager();
+      TableUpsertMetadataManager tumm = 
tableDataManager.getTableUpsertMetadataManager();
+      Preconditions.checkState(tumm != null,
+          "TableUpsertMetadataManager is null for upsert-enabled table: %s", 
tableNameWithType);
       boolean isUsingConsistencyMode =
-          
rtdm.getTableUpsertMetadataManager().getContext().getConsistencyMode() != 
UpsertConfig.ConsistencyMode.NONE;
+          tumm.getContext().getConsistencyMode() != 
UpsertConfig.ConsistencyMode.NONE;
       if (isUsingConsistencyMode) {
         tumm.lockForSegmentContexts();
       }
@@ -128,18 +130,6 @@ public class SingleTableExecutionInfo implements 
TableExecutionInfo {
         segmentsToQuery, optionalSegments, notAcquiredSegments);
   }
 
-  private static boolean isUpsertTable(TableDataManager tableDataManager) {
-    // For upsert table, the server can start to process newly added segments 
before brokers can add those segments
-    // into their routing tables, like newly created consuming segment or 
newly uploaded segments. We should include
-    // those segments in the list of segments for query to process on the 
server, otherwise, the query will see less
-    // than expected valid docs from the upsert table.
-    if (tableDataManager instanceof RealtimeTableDataManager) {
-      RealtimeTableDataManager rtdm = (RealtimeTableDataManager) 
tableDataManager;
-      return rtdm.isUpsertEnabled();
-    }
-    return false;
-  }
-
   private SingleTableExecutionInfo(TableDataManager tableDataManager, 
List<SegmentDataManager> segmentDataManagers,
       List<IndexSegment> indexSegments, Map<IndexSegment, SegmentContext> 
providedSegmentContexts,
       List<String> segmentsToQuery, List<String> optionalSegments, 
List<String> notAcquiredSegments) {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineUpsertTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineUpsertTableIntegrationTest.java
new file mode 100644
index 00000000000..7dbb755745d
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineUpsertTableIntegrationTest.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.client.ResultSet;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
+import org.apache.pinot.spi.config.table.RoutingConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test for offline table upsert support.
+ *
+ * Tests that OFFLINE tables with upsert enabled correctly deduplicate records 
by primary key,
+ * keeping the latest record based on the comparison column (time column).
+ *
+ * Test data layout:
+ *   Segment 1 (partition 0): playerId=100 (score=2000, ts=1000), playerId=101 
(score=3000, ts=1000)
+ *   Segment 2 (partition 0): playerId=100 (score=2500, ts=2000), playerId=102 
(score=4000, ts=1000)
+ *   Segment 3 (partition 0): playerId=101 (score=3500, ts=2000), playerId=102 
(score=4500, ts=2000)
+ *
+ * After upsert dedup (latest by timestampInEpoch):
+ *   playerId=100 -> score=2500 (from segment 2, ts=2000)
+ *   playerId=101 -> score=3500 (from segment 3, ts=2000)
+ *   playerId=102 -> score=4500 (from segment 3, ts=2000)
+ */
+public class OfflineUpsertTableIntegrationTest extends 
BaseClusterIntegrationTest {
+  private static final String TABLE_NAME = "offlineUpsertTest";
+  private static final String PRIMARY_KEY_COL = "playerId";
+  private static final String TIME_COL_NAME = "timestampInEpoch";
+  private static final int NUM_PARTITIONS = 1;
+  private static final int TOTAL_RAW_RECORDS = 6;
+  private static final int UNIQUE_PRIMARY_KEYS = 3;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+
+    // Create and upload schema
+    Schema schema = createUpsertSchema();
+    addSchema(schema);
+
+    // Create OFFLINE table config with upsert enabled
+    TableConfig tableConfig = createOfflineUpsertTableConfig();
+    addTableConfig(tableConfig);
+
+    // Build and upload segments with overlapping primary keys
+    buildAndUploadTestSegments(tableConfig, schema);
+
+    // Wait for all documents to load
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    dropOfflineTable(TABLE_NAME);
+    stopServer();
+    stopBroker();
+    stopController();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+
+  @Override
+  protected String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Nullable
+  @Override
+  protected String getTimeColumnName() {
+    return TIME_COL_NAME;
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return UNIQUE_PRIMARY_KEYS;
+  }
+
+  @Override
+  protected void waitForAllDocsLoaded(long timeoutMs)
+      throws Exception {
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        return queryCountStarWithoutUpsert() == TOTAL_RAW_RECORDS;
+      } catch (Exception e) {
+        return null;
+      }
+    }, 100L, timeoutMs, "Failed to load all documents");
+    assertEquals(getCurrentCountStarResult(), getCountStarResult());
+  }
+
+  /**
+   * Tests that COUNT(*) returns only unique primary keys (deduplication is 
working).
+   */
+  @Test
+  public void testUpsertQueryResults()
+      throws Exception {
+    // With upsert: should return only 3 unique primary keys
+    long upsertCount = getCurrentCountStarResult();
+    assertEquals(upsertCount, UNIQUE_PRIMARY_KEYS,
+        "Expected " + UNIQUE_PRIMARY_KEYS + " unique records after upsert 
dedup");
+
+    // Without upsert: should return all 6 raw records
+    long rawCount = queryCountStarWithoutUpsert();
+    assertEquals(rawCount, TOTAL_RAW_RECORDS,
+        "Expected " + TOTAL_RAW_RECORDS + " raw records with skipUpsert=true");
+
+    // Verify the latest records are returned (by checking scores)
+    ResultSet rs = getPinotConnection().execute(
+        "SELECT playerId, score FROM " + TABLE_NAME + " ORDER BY 
playerId").getResultSet(0);
+    assertEquals(rs.getRowCount(), UNIQUE_PRIMARY_KEYS);
+
+    // playerId=100 -> score=2500 (latest from segment 2)
+    assertEquals(rs.getInt(0, 0), 100);
+    assertEquals(rs.getFloat(0, 1), 2500.0f, 0.01f);
+
+    // playerId=101 -> score=3500 (latest from segment 3)
+    assertEquals(rs.getInt(1, 0), 101);
+    assertEquals(rs.getFloat(1, 1), 3500.0f, 0.01f);
+
+    // playerId=102 -> score=4500 (latest from segment 3)
+    assertEquals(rs.getInt(2, 0), 102);
+    assertEquals(rs.getFloat(2, 1), 4500.0f, 0.01f);
+  }
+
+  /**
+   * Tests that uploading a new segment with updated records replaces older 
values.
+   */
+  @Test(dependsOnMethods = "testUpsertQueryResults")
+  public void testSegmentReplacement()
+      throws Exception {
+    Schema schema = createUpsertSchema();
+    TableConfig tableConfig = getOfflineTableConfig();
+
+    // Build a new segment with updated values for playerId=100
+    List<GenericRow> rows = new ArrayList<>();
+    GenericRow row = new GenericRow();
+    row.putValue(PRIMARY_KEY_COL, 100);
+    row.putValue("name", "UpdatedPlayer");
+    row.putValue("game", "chess");
+    row.putValue("score", 9999.0f);
+    row.putValue(TIME_COL_NAME, 1691036400000L);
+    rows.add(row);
+
+    File newSegmentDir = new File(_tempDir, "newSegmentDir");
+    File newTarDir = new File(_tempDir, "newTarDir");
+    TestUtils.ensureDirectoriesExistAndEmpty(newSegmentDir, newTarDir);
+    buildSegment(tableConfig, schema, "segment_update_0", rows, newSegmentDir, 
newTarDir);
+    uploadSegments(TABLE_NAME, newTarDir);
+
+    // Wait for the new segment to load (7 total raw records now)
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        return queryCountStarWithoutUpsert() == TOTAL_RAW_RECORDS + 1;
+      } catch (Exception e) {
+        return null;
+      }
+    }, 100L, 600_000L, "Failed to load updated segment");
+
+    // Verify upsert still returns 3 unique primary keys
+    assertEquals(getCurrentCountStarResult(), UNIQUE_PRIMARY_KEYS);
+
+    // Verify playerId=100 now has the updated score
+    ResultSet rs = getPinotConnection().execute(
+        "SELECT score FROM " + TABLE_NAME + " WHERE playerId = 
100").getResultSet(0);
+    assertEquals(rs.getRowCount(), 1);
+    assertEquals(rs.getFloat(0, 0), 9999.0f, 0.01f);
+  }
+
+  private Schema createUpsertSchema() {
+    return new Schema.SchemaBuilder()
+        .setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(PRIMARY_KEY_COL, FieldSpec.DataType.INT)
+        .addSingleValueDimension("name", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("game", FieldSpec.DataType.STRING)
+        .addMetric("score", FieldSpec.DataType.FLOAT)
+        .addDateTime(TIME_COL_NAME, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .setPrimaryKeyColumns(List.of(PRIMARY_KEY_COL))
+        .build();
+  }
+
+  private TableConfig createOfflineUpsertTableConfig() {
+    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+
+    Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new 
HashMap<>();
+    columnPartitionConfigMap.put(PRIMARY_KEY_COL, new 
ColumnPartitionConfig("Murmur", NUM_PARTITIONS));
+
+    return new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(TABLE_NAME)
+        .setTimeColumnName(TIME_COL_NAME)
+        .setUpsertConfig(upsertConfig)
+        .setRoutingConfig(new RoutingConfig(null, null,
+            RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+        .setSegmentPartitionConfig(new 
SegmentPartitionConfig(columnPartitionConfigMap))
+        .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(PRIMARY_KEY_COL, 1))
+        .build();
+  }
+
+  private void buildAndUploadTestSegments(TableConfig tableConfig, Schema 
schema)
+      throws Exception {
+    // Segment 1: playerId=100 (score=2000, ts=1671036400000), playerId=101 
(score=3000, ts=1671036400000)
+    List<GenericRow> segment1Rows = new ArrayList<>();
+    segment1Rows.add(createRow(100, "Alice", "chess", 2000.0f, 
1671036400000L));
+    segment1Rows.add(createRow(101, "Bob", "chess", 3000.0f, 1671036400000L));
+    buildSegment(tableConfig, schema, "segment_0", segment1Rows, _segmentDir, 
_tarDir);
+
+    // Segment 2: playerId=100 (score=2500, ts=1681036400000), playerId=102 
(score=4000, ts=1671036400000)
+    List<GenericRow> segment2Rows = new ArrayList<>();
+    segment2Rows.add(createRow(100, "Alice", "chess", 2500.0f, 
1681036400000L));
+    segment2Rows.add(createRow(102, "Charlie", "chess", 4000.0f, 
1671036400000L));
+    buildSegment(tableConfig, schema, "segment_1", segment2Rows, _segmentDir, 
_tarDir);
+
+    // Segment 3: playerId=101 (score=3500, ts=1681036400000), playerId=102 
(score=4500, ts=1681036400000)
+    List<GenericRow> segment3Rows = new ArrayList<>();
+    segment3Rows.add(createRow(101, "Bob", "chess", 3500.0f, 1681036400000L));
+    segment3Rows.add(createRow(102, "Charlie", "chess", 4500.0f, 
1681036400000L));
+    buildSegment(tableConfig, schema, "segment_2", segment3Rows, _segmentDir, 
_tarDir);
+
+    uploadSegments(TABLE_NAME, _tarDir);
+  }
+
+  private GenericRow createRow(int playerId, String name, String game, float 
score, long timestamp) {
+    GenericRow row = new GenericRow();
+    row.putValue(PRIMARY_KEY_COL, playerId);
+    row.putValue("name", name);
+    row.putValue("game", game);
+    row.putValue("score", score);
+    row.putValue(TIME_COL_NAME, timestamp);
+    return row;
+  }
+
+  private void buildSegment(TableConfig tableConfig, Schema schema, String 
segmentName,
+      List<GenericRow> rows, File segmentDir, File tarDir)
+      throws Exception {
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setOutDir(segmentDir.getPath());
+    config.setTableName(tableConfig.getTableName());
+    config.setSegmentName(segmentName);
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    RecordReader recordReader = new GenericRowRecordReader(rows);
+    driver.init(config, recordReader);
+    driver.build();
+
+    File indexDir = new File(segmentDir, segmentName);
+    File segmentTarFile = new File(tarDir, segmentName + 
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarCompressionUtils.createCompressedTarFile(indexDir, segmentTarFile);
+  }
+
+  private long queryCountStarWithoutUpsert() {
+    return getPinotConnection().execute(
+        "SELECT COUNT(*) FROM " + TABLE_NAME + " 
OPTION(skipUpsert=true)").getResultSet(0).getLong(0);
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index b52a69ab363..903eeb9aba3 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.data.manager;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import java.io.File;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -32,6 +33,7 @@ import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
 import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
 import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
@@ -358,4 +360,26 @@ public interface TableDataManager {
    * @return List of {@link StaleSegment} with segment names and reason why it 
is stale
    */
   List<StaleSegment> getStaleSegments();
+
+  /**
+   * Returns whether upsert is enabled for this table.
+   */
+  default boolean isUpsertEnabled() {
+    return false;
+  }
+
+  /**
+   * Returns the table upsert metadata manager if upsert is enabled, null 
otherwise.
+   */
+  @Nullable
+  default TableUpsertMetadataManager getTableUpsertMetadataManager() {
+    return null;
+  }
+
+  /**
+   * Returns a mapping of partition id to primary key count. Supports both 
upsert and dedup enabled tables.
+   */
+  default Map<Integer, Long> getPartitionToPrimaryKeyCount() {
+    return Collections.emptyMap();
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index ab414dd6ede..aa43d89f819 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -62,10 +62,12 @@ import 
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.BooleanUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.slf4j.Logger;
@@ -79,6 +81,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   protected static final double TTL_WATERMARK_NOT_SET = 
Double.NEGATIVE_INFINITY;
 
   protected final String _tableNameWithType;
+  protected final TableType _tableType;
   protected final int _partitionId;
   protected final UpsertContext _context;
   protected final List<String> _primaryKeyColumns;
@@ -138,6 +141,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
 
   protected BasePartitionUpsertMetadataManager(String tableNameWithType, int 
partitionId, UpsertContext context) {
     _tableNameWithType = tableNameWithType;
+    _tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
     _partitionId = partitionId;
     _context = context;
     _primaryKeyColumns = context.getPrimaryKeyColumns();
@@ -299,6 +303,18 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
   }
 
+  /**
+   * Creates a RecordInfoReader for the given segment. When comparison columns 
are configured, reads comparison values
+   * from the columns. When comparison columns are empty, uses segment 
creation time as the comparison value.
+   */
+  protected UpsertUtils.RecordInfoReader createRecordInfoReader(IndexSegment 
segment) {
+    if (_comparisonColumns.isEmpty()) {
+      long segmentCreationTime = getAuthoritativeUpdateOrCreationTime(segment);
+      return new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns, 
segmentCreationTime, _deleteRecordColumn);
+    }
+    return new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns, 
_comparisonColumns, _deleteRecordColumn);
+  }
+
   protected boolean isTTLEnabled() {
     return _metadataTTL > 0 || _deletedKeysTTL > 0;
   }
@@ -351,7 +367,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   protected void doAddSegment(ImmutableSegmentImpl segment) {
     String segmentName = segment.getSegmentName();
     _logger.info("Adding segment: {}, current primary key count: {}", 
segmentName, getNumPrimaryKeys());
-    if (isTTLEnabled()) {
+    if (isTTLEnabled() && !_comparisonColumns.isEmpty()) {
       double maxComparisonValue = getMaxComparisonValue(segment);
       _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, 
maxComparisonValue));
       if (isOutOfMetadataTTL(maxComparisonValue) && 
skipAddSegmentOutOfTTL(segment)) {
@@ -362,8 +378,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     if (!_enableSnapshot) {
       deleteSnapshot(segment);
     }
-    try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
-        _comparisonColumns, _deleteRecordColumn)) {
+    try (UpsertUtils.RecordInfoReader recordInfoReader = 
createRecordInfoReader(segment)) {
       Iterator<RecordInfo> recordInfoIterator =
           UpsertUtils.getRecordInfoIterator(recordInfoReader, 
segment.getSegmentMetadata().getTotalDocs());
       addSegment(segment, null, null, recordInfoIterator);
@@ -427,15 +442,14 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
       return;
     }
-    if (isTTLEnabled()) {
+    if (isTTLEnabled() && !_comparisonColumns.isEmpty()) {
       double maxComparisonValue = getMaxComparisonValue(segment);
       _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, 
maxComparisonValue));
       if (isOutOfMetadataTTL(maxComparisonValue) && 
skipPreloadSegmentOutOfTTL(segment, validDocIds)) {
         return;
       }
     }
-    try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
-        _comparisonColumns, _deleteRecordColumn)) {
+    try (UpsertUtils.RecordInfoReader recordInfoReader = 
createRecordInfoReader(segment)) {
       doPreloadSegment(segment, null, null, 
UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds));
     } catch (Exception e) {
       throw new RuntimeException(
@@ -600,14 +614,13 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       replaceSegment(segment, null, null, null, oldSegment);
       return;
     }
-    if (isTTLEnabled()) {
+    if (isTTLEnabled() && !_comparisonColumns.isEmpty()) {
       double maxComparisonValue = getMaxComparisonValue(segment);
       _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, 
maxComparisonValue));
       // Segment might be uploaded directly to the table to replace an old 
segment. So update the TTL watermark but
       // we can't skip segment even if it's out of TTL as its validDocIds 
bitmap is not updated yet.
     }
-    try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
-        _comparisonColumns, _deleteRecordColumn)) {
+    try (UpsertUtils.RecordInfoReader recordInfoReader = 
createRecordInfoReader(segment)) {
       Iterator<RecordInfo> recordInfoIterator =
           UpsertUtils.getRecordInfoIterator(recordInfoReader, 
segment.getSegmentMetadata().getTotalDocs());
       replaceSegment(segment, null, null, recordInfoIterator, oldSegment);
@@ -787,7 +800,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     try {
       // Skip removing the upsert metadata of segment that is out of metadata 
TTL. The expired metadata is removed
       // while creating new consuming segment in batches.
-      if (isOutOfMetadataTTL(segment)) {
+      if (!_comparisonColumns.isEmpty() && isOutOfMetadataTTL(segment)) {
         _logger.info("Skip removing segment: {} because it's out of TTL", 
segmentName);
       } else {
         doRemoveSegment(segment);
@@ -1290,16 +1303,22 @@ public abstract class 
BasePartitionUpsertMetadataManager implements PartitionUps
   }
 
   /**
-   * Returns the ZooKeeper creation time for upsert consistency.
-   * This refers to the time set by the controller when creating new consuming 
segment.
-   * This is used to ensure consistent creation time across replicas for upsert
-   * operations.
-   * @return ZK creation time in milliseconds, or Long.MIN_VALUE if not set
+   * Returns the ZooKeeper update time for upsert consistency.
+   * For realtime table, this refers to the time set by the controller when 
creating new consuming segment.
+   * For offline table, this refers to the segment push time.
+   * This is used to ensure consistent creation time across replicas for 
upsert operations.
+   * @return ZK push time or creation time in milliseconds, or Long.MIN_VALUE 
if not set
    */
-  protected long getAuthoritativeCreationTime(IndexSegment segment) {
+  protected long getAuthoritativeUpdateOrCreationTime(IndexSegment segment) {
     SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
     if (segmentMetadata instanceof SegmentMetadataImpl) {
       SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl) 
segmentMetadata;
+      if (_tableType == TableType.OFFLINE) {
+        long zkPushTime = segmentMetadataImpl.getZkPushTime();
+        if (zkPushTime != Long.MIN_VALUE) {
+          return zkPushTime;
+        }
+      }
       long zkCreationTime = segmentMetadataImpl.getZkCreationTime();
       if (zkCreationTime != Long.MIN_VALUE) {
         return zkCreationTime;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 0f268e6168a..fed81eafc92 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.upsert;
 
 import com.google.common.base.Preconditions;
+import java.util.Collections;
 import java.util.List;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
@@ -59,7 +60,13 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
 
     List<String> comparisonColumns = upsertConfig.getComparisonColumns();
     if (comparisonColumns == null) {
-      comparisonColumns = 
List.of(tableConfig.getValidationConfig().getTimeColumnName());
+      String timeColumnName = 
tableConfig.getValidationConfig().getTimeColumnName();
+      if (timeColumnName != null) {
+        comparisonColumns = List.of(timeColumnName);
+      } else {
+        // No comparison column and no time column: use segment creation time 
for comparison
+        comparisonColumns = Collections.emptyList();
+      }
     }
 
     PartialUpsertHandler partialUpsertHandler = null;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index cd93d99d110..6edc1020150 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -145,8 +145,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
               // current value, but the segment has a larger sequence number 
(the segment is newer than the current
               // segment).
               if (comparisonResult > 0 || (comparisonResult == 0 && 
shouldReplaceOnComparisonTie(segmentName,
-                  currentSegmentName, getAuthoritativeCreationTime(segment),
-                  getAuthoritativeCreationTime(currentSegment)))) {
+                  currentSegmentName, 
getAuthoritativeUpdateOrCreationTime(segment),
+                  getAuthoritativeUpdateOrCreationTime(currentSegment)))) {
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentSegment, currentDocId, newDocId, recordInfo);
                 if (currentSegment != segment) {
                   _previousKeyToRecordLocationMap.put(primaryKey, 
currentRecordLocation);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
index 9699547d3db..6e42097ee9d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -180,8 +180,8 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
               // current value, but the segment has a larger sequence number 
(the segment is newer than the current
               // segment).
               if (comparisonResult > 0 || (comparisonResult == 0 && 
shouldReplaceOnComparisonTie(segmentName,
-                  currentSegmentName, getAuthoritativeCreationTime(segment),
-                  getAuthoritativeCreationTime(currentSegment)))) {
+                  currentSegmentName, 
getAuthoritativeUpdateOrCreationTime(segment),
+                  getAuthoritativeUpdateOrCreationTime(currentSegment)))) {
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentSegment, currentDocId, newDocId, recordInfo);
                 return new RecordLocation(segment, newDocId, 
newComparisonValue,
                     
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
index d576e325979..6ef76c8a66d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
@@ -352,7 +352,7 @@ public class UpsertContext {
       Preconditions.checkState(_schema != null, "Schema must be set");
       Preconditions.checkState(CollectionUtils.isNotEmpty(_primaryKeyColumns), 
"Primary key columns must be set");
       Preconditions.checkState(_hashFunction != null, "Hash function must be 
set");
-      Preconditions.checkState(CollectionUtils.isNotEmpty(_comparisonColumns), 
"Comparison columns must be set");
+      Preconditions.checkState(_comparisonColumns != null, "Comparison columns 
must be set");
       Preconditions.checkState(_consistencyMode != null, "Consistency mode 
must be set");
       if (_tableIndexDir == null) {
         Preconditions.checkState(_tableDataManager != null, "Either table data 
manager or table index dir must be set");
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index 42c20f8aadb..bedea992fd6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -185,6 +185,21 @@ public class UpsertUtils {
       }
     }
 
+    /**
+     * Constructor that uses a constant comparison value for all records.
+     * Used when no comparison columns are configured and segment creation 
time is used as the comparison value.
+     */
+    public RecordInfoReader(IndexSegment segment, List<String> 
primaryKeyColumns,
+        Comparable constantComparisonValue, @Nullable String 
deleteRecordColumn) {
+      _primaryKeyReader = new PrimaryKeyReader(segment, primaryKeyColumns);
+      _comparisonColumnReader = new 
ConstantComparisonColumnReader(constantComparisonValue);
+      if (deleteRecordColumn != null) {
+        _deleteRecordColumnReader = new PinotSegmentColumnReader(segment, 
deleteRecordColumn);
+      } else {
+        _deleteRecordColumnReader = null;
+      }
+    }
+
     public RecordInfo getRecordInfo(int docId) {
       PrimaryKey primaryKey = _primaryKeyReader.getPrimaryKey(docId);
       Comparable comparisonValue = 
_comparisonColumnReader.getComparisonValue(docId);
@@ -266,4 +281,25 @@ public class UpsertUtils {
       }
     }
   }
+
+  /**
+   * A comparison column reader that returns a constant value for all records.
+   * Used when no comparison columns are configured and segment creation time 
is used as the comparison value.
+   */
+  public static class ConstantComparisonColumnReader implements 
ComparisonColumnReader {
+    private final Comparable _constantValue;
+
+    public ConstantComparisonColumnReader(Comparable constantValue) {
+      _constantValue = constantValue;
+    }
+
+    @Override
+    public Comparable getComparisonValue(int docId) {
+      return _constantValue;
+    }
+
+    @Override
+    public void close() {
+    }
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 0947c5f6938..fefbf2efa96 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -766,7 +766,7 @@ public final class TableConfigUtils {
 
   /**
    * Validates the upsert-related configurations
-   *  - check table type is realtime
+   *  - check table type supports the configured mode
    *  - the primary key exists on the schema
    *  - strict replica-group is configured for routing type
    *  - consumer type must be low-level
@@ -786,9 +786,19 @@ public final class TableConfigUtils {
     // check both upsert and dedup are not enabled simultaneously
     Preconditions.checkState(!(isUpsertEnabled && isDedupEnabled),
         "A table can have either Upsert or Dedup enabled, but not both");
-    // check table type is realtime
-    Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
-        "Upsert/Dedup table is for realtime table only.");
+    if (tableConfig.getTableType() == TableType.OFFLINE) {
+      Preconditions.checkState(isUpsertEnabled && !isDedupEnabled,
+          "Dedup is not supported for OFFLINE table. Only upsert is supported 
for OFFLINE table");
+      // Offline upsert tables require segment partition config so that 
segments are assigned to servers
+      // based on partition, ensuring all segments of a partition land on the 
same server for correct dedup.
+      IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+      SegmentPartitionConfig segmentPartitionConfig =
+          indexingConfig != null ? indexingConfig.getSegmentPartitionConfig() 
: null;
+      Preconditions.checkState(
+          segmentPartitionConfig != null && 
MapUtils.isNotEmpty(segmentPartitionConfig.getColumnPartitionMap()),
+          "Offline upsert table must have segment partition config to ensure 
correct partition-based "
+              + "segment assignment. Configure segmentPartitionConfig in the 
indexingConfig.");
+    }
     // primary key exists
     
Preconditions.checkState(CollectionUtils.isNotEmpty(schema.getPrimaryKeyColumns()),
         "Upsert/Dedup table must have primary key columns in the schema");
@@ -804,10 +814,12 @@ public final class TableConfigUtils {
     Preconditions.checkState(
         tableConfig.getRoutingConfig() != null && 
isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
         "Upsert/Dedup table must use strict replica-group (i.e. 
strictReplicaGroup) based routing");
-    
Preconditions.checkState(tableConfig.getTenantConfig().getTagOverrideConfig() 
== null || (
-            
tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeConsuming() == 
null
-                && 
tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeCompleted() == 
null),
-        "Invalid tenant tag override used for Upsert/Dedup table");
+    if (tableConfig.getTableType() == TableType.REALTIME) {
+      
Preconditions.checkState(tableConfig.getTenantConfig().getTagOverrideConfig() 
== null || (
+              
tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeConsuming() == 
null
+                  && 
tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeCompleted() == 
null),
+          "Invalid tenant tag override used for Upsert/Dedup table");
+    }
 
     // specifically for upsert
     UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
@@ -907,10 +919,12 @@ public final class TableConfigUtils {
       }
     }
 
-    Preconditions.checkState(
-        tableConfig.getInstanceAssignmentConfigMap() == null || 
!tableConfig.getInstanceAssignmentConfigMap()
-            .containsKey(InstancePartitionsType.COMPLETED.name()),
-        "COMPLETED instance partitions can't be configured for upsert / dedup 
tables");
+    if (tableConfig.getTableType() == TableType.REALTIME) {
+      Preconditions.checkState(
+          tableConfig.getInstanceAssignmentConfigMap() == null || 
!tableConfig.getInstanceAssignmentConfigMap()
+              .containsKey(InstancePartitionsType.COMPLETED.name()),
+          "COMPLETED instance partitions can't be configured for upsert / 
dedup tables");
+    }
     validateAggregateMetricsForUpsertConfig(tableConfig);
     validateTTLForUpsertConfig(tableConfig, schema);
     validateTTLForDedupConfig(tableConfig, schema);
@@ -949,6 +963,8 @@ public final class TableConfigUtils {
           comparisonColumn, comparisonColumnDataType);
     } else {
       String comparisonColumn = 
tableConfig.getValidationConfig().getTimeColumnName();
+      Preconditions.checkState(comparisonColumn != null,
+          "MetadataTTL / DeletedKeysTTL requires either a comparison column or 
a time column to be configured");
       DataType comparisonColumnDataType = 
schema.getFieldSpecFor(comparisonColumn).getDataType();
       
Preconditions.checkState(isValidTimeComparisonType(comparisonColumnDataType),
           "MetadataTTL / DeletedKeysTTL must have time column: %s in numeric 
type, found: %s",
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 0194c45633c..4bcb84bed09 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1904,6 +1904,7 @@ public class TableConfigUtilsTest {
         .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
         .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
         .build();
+    // OFFLINE table should fail because dedup is not yet supported for 
offline tables
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setTimeColumnName(TIME_COLUMN)
         .setDedupConfig(new DedupConfig())
@@ -1912,7 +1913,8 @@ public class TableConfigUtilsTest {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
       fail();
     } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Upsert/Dedup table is for realtime table 
only.");
+      assertEquals(e.getMessage(), "Dedup is not supported for OFFLINE table. 
Only upsert is supported for OFFLINE"
+          + " table");
     }
 
     tableConfig =
@@ -2108,6 +2110,7 @@ public class TableConfigUtilsTest {
         .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
         .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
         .build();
+    // OFFLINE table without segment partition config should fail with 
partition config error
     UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setUpsertConfig(upsertConfig)
@@ -2117,7 +2120,9 @@ public class TableConfigUtilsTest {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
       fail();
     } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Upsert/Dedup table is for realtime table 
only.");
+      assertEquals(e.getMessage(),
+          "Offline upsert table must have segment partition config to ensure 
correct partition-based "
+              + "segment assignment. Configure segmentPartitionConfig in the 
indexingConfig.");
     }
 
     tableConfig =
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
index 1f3a0c9a52e..8a8394c7f32 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
@@ -81,6 +81,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
   private long _dataCrc = Long.MIN_VALUE;
   private long _creationTime = Long.MIN_VALUE;
   private long _zkCreationTime = Long.MIN_VALUE;  // ZooKeeper creation time 
for upsert consistency
+  private long _zkPushTime = Long.MIN_VALUE; // ZooKeeper push time for upsert 
consistency
   private String _timeColumn;
   private TimeUnit _timeUnit;
   private Duration _timeGranularity;
@@ -412,9 +413,10 @@ public class SegmentMetadataImpl implements 
SegmentMetadata {
 
   /**
    * Returns the ZooKeeper creation time for upsert consistency.
-   * This refers to the time set by controller while creating the consuming 
segment. It is used to ensure consistent
-   * creation time across replicas for upsert operations.
-   * @return ZK creation time in milliseconds, or Long.MIN_VALUE if not set
+   * For REALTIME tables, this is set by the controller when the consuming 
segment is created, ensuring consistent
+   * creation time across replicas. For segments loaded from disk, this 
returns {@code Long.MIN_VALUE} until
+   * {@link #setZkCreationTime(long)} is explicitly called (e.g. from ZK 
metadata during segment loading).
+   * @return ZK creation time in milliseconds, or {@code Long.MIN_VALUE} if 
not explicitly set
    */
   public long getZkCreationTime() {
     return _zkCreationTime;
@@ -428,6 +430,24 @@ public class SegmentMetadataImpl implements 
SegmentMetadata {
     _zkCreationTime = zkCreationTime;
   }
 
+  /**
+   * Returns the ZooKeeper push time for upsert consistency.
+   * This refers to the time set by controller while pushing the segment. It 
is used to ensure consistent
+   * push time across replicas for upsert operations.
+   * @return ZK push time in milliseconds, or Long.MIN_VALUE if not set
+   */
+  public long getZkPushTime() {
+    return _zkPushTime;
+  }
+
+  /**
+   * Sets the ZooKeeper push time for upsert consistency.
+   * @param zkPushTime ZK push time in milliseconds
+   */
+  public void setZkPushTime(long zkPushTime) {
+    _zkPushTime = zkPushTime;
+  }
+
   @Override
   public long getLastIndexedTimestamp() {
     return Long.MIN_VALUE;
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java
index e45881a5e88..0914638fa71 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java
@@ -24,7 +24,6 @@ import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.restlet.resources.PrimaryKeyCountInfo;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,14 +59,9 @@ public class PrimaryKeyCount {
         LOGGER.warn("TableDataManager for table: {} is null, skipping", 
tableNameWithType);
         continue;
       }
-      if (tableDataManager instanceof RealtimeTableDataManager) {
-        Map<Integer, Long> partitionToPrimaryKeyCount =
-            ((RealtimeTableDataManager) 
tableDataManager).getPartitionToPrimaryKeyCount();
-
-        if (!partitionToPrimaryKeyCount.isEmpty()) {
-          tablesWithPrimaryKeys.add(tableNameWithType);
-        }
-
+      Map<Integer, Long> partitionToPrimaryKeyCount = 
tableDataManager.getPartitionToPrimaryKeyCount();
+      if (!partitionToPrimaryKeyCount.isEmpty()) {
+        tablesWithPrimaryKeys.add(tableNameWithType);
         for (Long numPrimaryKeys : partitionToPrimaryKeyCount.values()) {
           totalPrimaryKeyCount += numPrimaryKeys == null ? 0 : numPrimaryKeys;
         }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 85a842d54e7..aa6d7f8526c 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -296,12 +296,8 @@ public class TablesResource {
       }
     }
 
-    // fetch partition to primary key count for realtime tables that have 
upsert or dedup enabled
-    Map<Integer, Long> partitionToPrimaryKeyCountMap = new HashMap<>();
-    if (tableDataManager instanceof RealtimeTableDataManager) {
-      RealtimeTableDataManager realtimeTableDataManager = 
(RealtimeTableDataManager) tableDataManager;
-      partitionToPrimaryKeyCountMap = 
realtimeTableDataManager.getPartitionToPrimaryKeyCount();
-    }
+    // fetch partition to primary key count for tables that have upsert or 
dedup enabled
+    Map<Integer, Long> partitionToPrimaryKeyCountMap = 
tableDataManager.getPartitionToPrimaryKeyCount();
 
     // construct partitionToServerPrimaryKeyCountMap to populate in 
TableMetadataInfo
     Map<Integer, Map<String, Long>> partitionToServerPrimaryKeyCountMap = new 
HashMap<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to