This is an automated email from the ASF dual-hosted git repository.
Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cd6dd61ce32 Rebuild upsert metadata from validDocIds snapshot on
reload for TTL tables (#18860)
cd6dd61ce32 is described below
commit cd6dd61ce3240efd62c2de851efc685bccd7e79e
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Mon Jun 29 00:28:15 2026 -0700
Rebuild upsert metadata from validDocIds snapshot on reload for TTL tables
(#18860)
---
.../core/data/manager/BaseTableDataManager.java | 38 ++++++++++++++++++-
.../upsert/BasePartitionUpsertMetadataManager.java | 26 ++++++++++++-
...rrentMapPartitionUpsertMetadataManagerTest.java | 44 ++++++++++++++++++++++
3 files changed, 104 insertions(+), 4 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 64843370fcd..8bb9393bb4e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -83,6 +83,7 @@ import
org.apache.pinot.segment.local.startree.StarTreeBuilderUtils;
import
org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.UpsertContext;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
@@ -94,6 +95,7 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
@@ -106,6 +108,7 @@ import
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
@@ -1121,12 +1124,39 @@ public abstract class BaseTableDataManager implements
TableDataManager {
- Copy the backup directory back to the original index directory.
- Continue loading the segment from the index directory.
*/
- boolean shouldDownload =
- forceDownload || (isSegmentStatusCompleted(zkMetadata) &&
!hasSameCRC(zkMetadata, localMetadata)
+ boolean crcMatch = hasSameCRC(zkMetadata, localMetadata);
+ boolean shouldDownload = forceDownload
+ || (isSegmentStatusCompleted(zkMetadata) && !crcMatch
&& _instanceDataManagerConfig.shouldCheckCRCOnSegmentLoad());
+ // For an upsert table with metadata TTL and snapshots enabled, a reload
must not blindly re-scan the segment and
+ // re-add every primary key, as that would resurrect keys already
expired (metadataTTL) or deleted
+ // (deletedKeysTTL). Instead we rebuild the upsert metadata from the
persisted validDocIds snapshot (valid docs
+ // only). The snapshot is docId-position based, so it only maps to the
segment being loaded when the content (CRC)
+ // is unchanged. A CRC mismatch with shouldDownload means a different
(downloaded) copy will be loaded, so
+ // reusing the local snapshot could silently rebuild upsert state
against the wrong docIds: fail closed in that
+ // case. When CRC checking is disabled
(instance.check.crc.on.segment.load=false) a CRC-mismatched normal reload
+ // keeps the local segment (shouldDownload is false), so the local
snapshot still maps and the reload proceeds.
+ UpsertContext upsertContext = isUpsertEnabled() ?
_tableUpsertMetadataManager.getContext() : null;
+ boolean restoreValidDocIdsSnapshot = upsertContext != null &&
upsertContext.isSnapshotEnabled()
+ && (upsertContext.getMetadataTTL() > 0 ||
upsertContext.getDeletedKeysTTL() > 0);
+ if (restoreValidDocIdsSnapshot && !crcMatch && shouldDownload) {
+ throw new IllegalStateException(String.format(
+ "Failing reload for segment: %s of upsert table with metadata TTL:
%s because the segment CRC has "
+ + "changed from: %s to: %s and the docId-based validDocIds
snapshot cannot be guaranteed to map to "
+ + "the reloaded segment. Reload once the local copy matches
the deep-store segment, or recreate the "
+ + "snapshot from the new segment.", segmentName,
_tableNameWithType, localMetadata.getCrc(),
+ zkMetadata.getCrc()));
+ }
if (shouldDownload) {
// Create backup directory to handle failure of segment reloading.
createBackup(indexDir);
+ // The validDocIds snapshot is a server-local file absent from the
deep-store copy. createBackup moved it into
+ // the backup dir; capture that path so it can be copied into the
freshly downloaded segment dir below (the
+ // non-download copyTo path already restores it via copyDirectory).
+ File backupValidDocIdsSnapshotFile = restoreValidDocIdsSnapshot ? new
File(
+ SegmentDirectoryPaths.findSegmentDirectory(new
File(indexDir.getParentFile(),
+ indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX)),
+ V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME) : null;
if (forceDownload) {
_logger.info("Force downloading segment: {}", segmentName);
} else {
@@ -1134,6 +1164,10 @@ public abstract class BaseTableDataManager implements
TableDataManager {
localMetadata.getCrc(), zkMetadata.getCrc());
}
indexDir = downloadSegment(zkMetadata);
+ if (backupValidDocIdsSnapshotFile != null &&
backupValidDocIdsSnapshotFile.exists()) {
+ FileUtils.copyFile(backupValidDocIdsSnapshotFile, new File(
+ SegmentDirectoryPaths.findSegmentDirectory(indexDir),
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
+ }
} else {
_logger.info("Reloading existing segment: {} on tier: {}", segmentName,
TierConfigUtils.normalizeTierName(segmentTier));
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index dc080b173ce..bdc065d0e8d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -611,8 +611,30 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// we can't skip segment even if it's out of TTL as its validDocIds
bitmap is not updated yet.
}
try (UpsertUtils.RecordInfoReader recordInfoReader =
createRecordInfoReader(segment)) {
- Iterator<RecordInfo> recordInfoIterator =
- UpsertUtils.getRecordInfoIterator(recordInfoReader,
segment.getSegmentMetadata().getTotalDocs());
+ // Reload-only fast path for an upsert + TTL table. The incoming segment
carries a validDocIds snapshot ONLY when
+ // the reload flow placed it there (see
BaseTableDataManager.reloadSegment); segment commits and uploads always
+ // build a fresh segment without one and fall through to the full scan
below, unaffected. Rebuilding from just the
+ // snapshot's valid docs avoids resurrecting primary keys already
expired/deleted by TTL.
+ MutableRoaringBitmap validDocIdsSnapshot = null;
+ if (isTTLEnabled() && segment instanceof ImmutableSegmentImpl) {
+ validDocIdsSnapshot =
+ ((ImmutableSegmentImpl)
segment).loadDocIdsFromSnapshot(V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
+ }
+ Iterator<RecordInfo> recordInfoIterator;
+ if (validDocIdsSnapshot != null) {
+ // Rebuild from the snapshot's docIds as-is. On a forceDownload reload
with a CRC mismatch the snapshot may have
+ // been taken on a different copy of the segment, so its docIds are
not guaranteed to map to the same rows here;
+ // the operator accepted that risk by requesting forceDownload.
+ _logger.info("Replacing segment: {} on reload using validDocIds
snapshot with {} valid docs (snapshot docIds "
+ + "are used as-is and assumed to map to this segment)",
segmentName, validDocIdsSnapshot.getCardinality());
+ recordInfoIterator =
UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIdsSnapshot);
+ } else {
+ // No validDocIds snapshot on the incoming segment: rebuild by
scanning all docs in the segment.
+ _logger.info("Replacing segment: {} using all {} docs (no validDocIds
snapshot available)", segmentName,
+ segment.getSegmentMetadata().getTotalDocs());
+ recordInfoIterator =
+ UpsertUtils.getRecordInfoIterator(recordInfoReader,
segment.getSegmentMetadata().getTotalDocs());
+ }
replaceSegment(segment, null, null, recordInfoIterator, oldSegment);
} catch (Exception e) {
throw new RuntimeException(
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index f9c58b83949..dbba9b0e2eb 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -290,6 +290,50 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.close();
}
+ @Test
+ public void testReplaceSegmentOnReloadRebuildsFromSnapshot()
+ throws Exception {
+ // Upsert + metadata TTL with snapshots enabled: a reload must rebuild
upsert metadata from the validDocIds
+ // snapshot (valid docs only), so docs invalidated since the segment was
first loaded are not resurrected.
+ _contextBuilder.setEnableSnapshot(true).setMetadataTTL(30);
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, _contextBuilder.build());
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+ String segmentName = "reload_snapshot_segment";
+ int[] primaryKeys = new int[]{10, 30, 40};
+ int[] timestamps = new int[]{1500, 3500, 4000};
+
+ // Initial load of the segment with 3 valid docs.
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment1 = createRealSegment(segmentName,
primaryKeys, timestamps, validDocIds1);
+ upsertMetadataManager.addSegment(segment1);
+ assertEquals(recordLocationMap.size(), 3);
+
+ // Reload: a fresh copy of the same segment that carries a validDocIds
snapshot marking only docs {0, 2} valid
+ // (doc 1 / PK 30 was invalidated). The reload must rebuild from the
snapshot, not rescan all docs.
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment2 = createRealSegment(segmentName,
primaryKeys, timestamps, validDocIds2);
+ ThreadSafeMutableRoaringBitmap snapshot = new
ThreadSafeMutableRoaringBitmap();
+ snapshot.add(0);
+ snapshot.add(2);
+
segment2.persistDocIdsSnapshot(V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME,
snapshot.getBytesAndCardinality());
+
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+
+ // PK 30 (doc 1) is not resurrected; only PK 10 and PK 40 remain, now in
the reloaded segment.
+ assertEquals(recordLocationMap.size(), 2);
+ checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 40, segment2, 2, 4000,
HashFunction.NONE);
+
assertNull(recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(30),
HashFunction.NONE)));
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
2);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ segment1.destroy();
+ segment2.destroy();
+ }
+
@Test
public void testGetQueryableDocIds() {
_contextBuilder.setDeleteRecordColumn(DELETE_RECORD_COLUMN);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]