This is an automated email from the ASF dual-hosted git repository.

Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cd6dd61ce32 Rebuild upsert metadata from validDocIds snapshot on 
reload for TTL tables (#18860)
cd6dd61ce32 is described below

commit cd6dd61ce3240efd62c2de851efc685bccd7e79e
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Mon Jun 29 00:28:15 2026 -0700

    Rebuild upsert metadata from validDocIds snapshot on reload for TTL tables 
(#18860)
---
 .../core/data/manager/BaseTableDataManager.java    | 38 ++++++++++++++++++-
 .../upsert/BasePartitionUpsertMetadataManager.java | 26 ++++++++++++-
 ...rrentMapPartitionUpsertMetadataManagerTest.java | 44 ++++++++++++++++++++++
 3 files changed, 104 insertions(+), 4 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 64843370fcd..8bb9393bb4e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -83,6 +83,7 @@ import 
org.apache.pinot.segment.local.startree.StarTreeBuilderUtils;
 import 
org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.UpsertContext;
 import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
 import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
@@ -94,6 +95,7 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentContext;
 import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
@@ -106,6 +108,7 @@ import 
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
 import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
@@ -1121,12 +1124,39 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       - Copy the backup directory back to the original index directory.
       - Continue loading the segment from the index directory.
       */
-      boolean shouldDownload =
-          forceDownload || (isSegmentStatusCompleted(zkMetadata) && 
!hasSameCRC(zkMetadata, localMetadata)
+      boolean crcMatch = hasSameCRC(zkMetadata, localMetadata);
+      boolean shouldDownload = forceDownload
+          || (isSegmentStatusCompleted(zkMetadata) && !crcMatch
               && _instanceDataManagerConfig.shouldCheckCRCOnSegmentLoad());
+      // For an upsert table with metadata TTL and snapshots enabled, a reload 
must not blindly re-scan the segment and
+      // re-add every primary key, as that would resurrect keys already 
expired (metadataTTL) or deleted
+      // (deletedKeysTTL). Instead we rebuild the upsert metadata from the 
persisted validDocIds snapshot (valid docs
+      // only). The snapshot is docId-position based, so it only maps to the 
segment being loaded when the content (CRC)
+      // is unchanged. A CRC mismatch with shouldDownload means a different 
(downloaded) copy will be loaded, so
+      // reusing the local snapshot could silently rebuild upsert state 
against the wrong docIds: fail closed in that
+      // case. When CRC checking is disabled 
(instance.check.crc.on.segment.load=false) a CRC-mismatched normal reload
+      // keeps the local segment (shouldDownload is false), so the local 
snapshot still maps and the reload proceeds.
+      UpsertContext upsertContext = isUpsertEnabled() ? 
_tableUpsertMetadataManager.getContext() : null;
+      boolean restoreValidDocIdsSnapshot = upsertContext != null && 
upsertContext.isSnapshotEnabled()
+          && (upsertContext.getMetadataTTL() > 0 || 
upsertContext.getDeletedKeysTTL() > 0);
+      if (restoreValidDocIdsSnapshot && !crcMatch && shouldDownload) {
+        throw new IllegalStateException(String.format(
+            "Failing reload for segment: %s of upsert table with metadata TTL: 
%s because the segment CRC has "
+                + "changed from: %s to: %s and the docId-based validDocIds 
snapshot cannot be guaranteed to map to "
+                + "the reloaded segment. Reload once the local copy matches 
the deep-store segment, or recreate the "
+                + "snapshot from the new segment.", segmentName, 
_tableNameWithType, localMetadata.getCrc(),
+            zkMetadata.getCrc()));
+      }
       if (shouldDownload) {
         // Create backup directory to handle failure of segment reloading.
         createBackup(indexDir);
+        // The validDocIds snapshot is a server-local file absent from the 
deep-store copy. createBackup moved it into
+        // the backup dir; capture that path so it can be copied into the 
freshly downloaded segment dir below (the
+        // non-download copyTo path already restores it via copyDirectory).
+        File backupValidDocIdsSnapshotFile = restoreValidDocIdsSnapshot ? new 
File(
+            SegmentDirectoryPaths.findSegmentDirectory(new 
File(indexDir.getParentFile(),
+                indexDir.getName() + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX)),
+            V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME) : null;
         if (forceDownload) {
           _logger.info("Force downloading segment: {}", segmentName);
         } else {
@@ -1134,6 +1164,10 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
               localMetadata.getCrc(), zkMetadata.getCrc());
         }
         indexDir = downloadSegment(zkMetadata);
+        if (backupValidDocIdsSnapshotFile != null && 
backupValidDocIdsSnapshotFile.exists()) {
+          FileUtils.copyFile(backupValidDocIdsSnapshotFile, new File(
+              SegmentDirectoryPaths.findSegmentDirectory(indexDir), 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
+        }
       } else {
         _logger.info("Reloading existing segment: {} on tier: {}", segmentName,
             TierConfigUtils.normalizeTierName(segmentTier));
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index dc080b173ce..bdc065d0e8d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -611,8 +611,30 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       // we can't skip segment even if it's out of TTL as its validDocIds 
bitmap is not updated yet.
     }
     try (UpsertUtils.RecordInfoReader recordInfoReader = 
createRecordInfoReader(segment)) {
-      Iterator<RecordInfo> recordInfoIterator =
-          UpsertUtils.getRecordInfoIterator(recordInfoReader, 
segment.getSegmentMetadata().getTotalDocs());
+      // Reload-only fast path for an upsert + TTL table. The incoming segment 
carries a validDocIds snapshot ONLY when
+      // the reload flow placed it there (see 
BaseTableDataManager.reloadSegment); segment commits and uploads always
+      // build a fresh segment without one and fall through to the full scan 
below, unaffected. Rebuilding from just the
+      // snapshot's valid docs avoids resurrecting primary keys already 
expired/deleted by TTL.
+      MutableRoaringBitmap validDocIdsSnapshot = null;
+      if (isTTLEnabled() && segment instanceof ImmutableSegmentImpl) {
+        validDocIdsSnapshot =
+            ((ImmutableSegmentImpl) 
segment).loadDocIdsFromSnapshot(V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
+      }
+      Iterator<RecordInfo> recordInfoIterator;
+      if (validDocIdsSnapshot != null) {
+        // Rebuild from the snapshot's docIds as-is. On a forceDownload reload 
with a CRC mismatch the snapshot may have
+        // been taken on a different copy of the segment, so its docIds are 
not guaranteed to map to the same rows here;
+        // the operator accepted that risk by requesting forceDownload.
+        _logger.info("Replacing segment: {} on reload using validDocIds 
snapshot with {} valid docs (snapshot docIds "
+            + "are used as-is and assumed to map to this segment)", 
segmentName, validDocIdsSnapshot.getCardinality());
+        recordInfoIterator = 
UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIdsSnapshot);
+      } else {
+        // No validDocIds snapshot on the incoming segment: rebuild by 
scanning all docs in the segment.
+        _logger.info("Replacing segment: {} using all {} docs (no validDocIds 
snapshot available)", segmentName,
+            segment.getSegmentMetadata().getTotalDocs());
+        recordInfoIterator =
+            UpsertUtils.getRecordInfoIterator(recordInfoReader, 
segment.getSegmentMetadata().getTotalDocs());
+      }
       replaceSegment(segment, null, null, recordInfoIterator, oldSegment);
     } catch (Exception e) {
       throw new RuntimeException(
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index f9c58b83949..dbba9b0e2eb 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -290,6 +290,50 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.close();
   }
 
+  @Test
+  public void testReplaceSegmentOnReloadRebuildsFromSnapshot()
+      throws Exception {
+    // Upsert + metadata TTL with snapshots enabled: a reload must rebuild 
upsert metadata from the validDocIds
+    // snapshot (valid docs only), so docs invalidated since the segment was 
first loaded are not resurrected.
+    _contextBuilder.setEnableSnapshot(true).setMetadataTTL(30);
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, _contextBuilder.build());
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    String segmentName = "reload_snapshot_segment";
+    int[] primaryKeys = new int[]{10, 30, 40};
+    int[] timestamps = new int[]{1500, 3500, 4000};
+
+    // Initial load of the segment with 3 valid docs.
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment1 = createRealSegment(segmentName, 
primaryKeys, timestamps, validDocIds1);
+    upsertMetadataManager.addSegment(segment1);
+    assertEquals(recordLocationMap.size(), 3);
+
+    // Reload: a fresh copy of the same segment that carries a validDocIds 
snapshot marking only docs {0, 2} valid
+    // (doc 1 / PK 30 was invalidated). The reload must rebuild from the 
snapshot, not rescan all docs.
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment2 = createRealSegment(segmentName, 
primaryKeys, timestamps, validDocIds2);
+    ThreadSafeMutableRoaringBitmap snapshot = new 
ThreadSafeMutableRoaringBitmap();
+    snapshot.add(0);
+    snapshot.add(2);
+    
segment2.persistDocIdsSnapshot(V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME, 
snapshot.getBytesAndCardinality());
+
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+
+    // PK 30 (doc 1) is not resurrected; only PK 10 and PK 40 remain, now in 
the reloaded segment.
+    assertEquals(recordLocationMap.size(), 2);
+    checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 40, segment2, 2, 4000, 
HashFunction.NONE);
+    
assertNull(recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(30), 
HashFunction.NONE)));
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 2);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+    segment1.destroy();
+    segment2.destroy();
+  }
+
   @Test
   public void testGetQueryableDocIds() {
     _contextBuilder.setDeleteRecordColumn(DELETE_RECORD_COLUMN);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to