This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0ea4d1f3ebc Deterministic data-only CRC (2/2) - use data-only CRC for
realtime committing segments during replace (#17380)
0ea4d1f3ebc is described below
commit 0ea4d1f3ebca614b3bca98d1974a91cc1fb0f5e4
Author: Anurag Rai <[email protected]>
AuthorDate: Thu Jan 15 02:25:13 2026 +0530
Deterministic data-only CRC (2/2) - use data-only CRC for realtime
committing segments during replace (#17380)
---
.../common/metadata/segment/SegmentZKMetadata.java | 16 +++++
.../metadata/segment/SegmentZKMetadataUtils.java | 8 ++-
.../core/data/manager/BaseTableDataManager.java | 10 +++-
.../data/manager/BaseTableDataManagerTest.java | 70 ++++++++++++++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
5 files changed, 103 insertions(+), 2 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
index 8938cd30809..730994cab78 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
@@ -182,6 +182,22 @@ public class SegmentZKMetadata implements ZKMetadata {
setNonNegativeValue(Segment.DATA_CRC, dataCrc);
}
+ public boolean isUseDataCrc() {
+ String useDataCrcString = _simpleFields.get(Segment.USE_DATA_CRC);
+ return Boolean.parseBoolean(useDataCrcString);
+ }
+
+ // useDataCrc is set for consuming segments in realtime table
+ // that signal replica server to use Data CRC when available for doing any
replacement
+ // of segments
+ public void setUseDataCrc(boolean useDataCrc) {
+ if (useDataCrc) {
+ _simpleFields.put(Segment.USE_DATA_CRC, "true");
+ } else {
+ _simpleFields.remove(Segment.USE_DATA_CRC);
+ }
+ }
+
public String getTier() {
return _simpleFields.get(Segment.TIER);
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java
index 0c72fcd7600..9df510b3d54 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java
@@ -94,6 +94,10 @@ public class SegmentZKMetadataUtils {
segmentZKMetadata.setEndOffset(endOffset);
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ // for committing segments, we use data CRC to replace but only if the
data CRC is present for the segment
+ if (Long.parseLong(segmentMetadata.getDataCrc()) >= 0) {
+ segmentZKMetadata.setUseDataCrc(true);
+ }
// For committing segment, use current time as start/end time if total
docs is 0
if (segmentMetadata.getTotalDocs() > 0) {
@@ -110,7 +114,9 @@ public class SegmentZKMetadataUtils {
segmentZKMetadata.setTimeUnit(TimeUnit.MILLISECONDS);
} else {
// For uploaded segment
-
+ // clear the use data crc flag for uploaded segments
+ // This flag should ONLY be true for segments committed from CONSUMING
state
+ segmentZKMetadata.setUseDataCrc(false);
// Set segment status, start/end offset info for real-time table
if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 29aadee1aa5..5694a9aaecc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -1659,8 +1659,16 @@ public abstract class BaseTableDataManager implements
TableDataManager {
return segmentDirectoryLoader.load(indexDir.toURI(), loaderContext);
}
+ // CRC check can be performed on both segment CRC and data CRC (if
available) based on the ZK property value of
+ // useDataCRC.
private static boolean hasSameCRC(SegmentZKMetadata zkMetadata,
SegmentMetadata localMetadata) {
- return zkMetadata.getCrc() == Long.parseLong(localMetadata.getCrc());
+ if (zkMetadata.getCrc() == Long.parseLong(localMetadata.getCrc())) {
+ return true;
+ }
+ return zkMetadata.isUseDataCrc()
+ && zkMetadata.getDataCrc() >= 0
+ && Long.parseLong(localMetadata.getDataCrc()) >= 0
+ && zkMetadata.getDataCrc() ==
Long.parseLong(localMetadata.getDataCrc());
}
private static void recoverReloadFailureQuietly(String tableNameWithType,
String segmentName, File indexDir) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 034cea027b9..cf7053e985e 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -802,6 +802,76 @@ public class BaseTableDataManagerTest {
}
}
+ @Test
+ public void
testReplaceSegmentIfCrcMismatchWhenFlagDisabledSegmentCrcMismatchShouldDownload()
+ throws Exception {
+ // When flag is disabled and segment CRCs don't match, should download
+ SegmentZKMetadata zkMetadata = createRawSegment(SegmentVersion.v3, 5);
+ zkMetadata.setCrc(2048L); // Different from local
+ zkMetadata.setDataCrc(99999L);
+ zkMetadata.setUseDataCrc(false);
+
+ ImmutableSegmentDataManager segmentDataManager =
createImmutableSegmentDataManager(SEGMENT_NAME, 1024L);
+ SegmentMetadata segmentMetadata =
segmentDataManager.getSegment().getSegmentMetadata();
+ when(segmentMetadata.getDataCrc()).thenReturn("99999");
+
+ BaseTableDataManager tableDataManager = createTableManager();
+ File dataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME);
+ assertFalse(dataDir.exists());
+
+ // Should download because segment CRCs don't match (ignores data CRC)
+ tableDataManager.replaceSegmentIfCrcMismatch(segmentDataManager,
zkMetadata, new IndexLoadingConfig());
+
+ assertTrue(dataDir.exists());
+ assertEquals(new SegmentMetadataImpl(dataDir).getTotalDocs(), 5);
+ }
+
+ @Test
+ public void
testReplaceSegmentIfCrcMismatchWhenFlagEnabledAndSegmentCrcMatchShouldNoDownload()
+ throws Exception {
+ // When flag is enabled and segment CRCs match, should not download
+ SegmentZKMetadata zkMetadata = createRawSegment(SegmentVersion.v3, 5);
+ long segmentCrc = zkMetadata.getCrc();
+ zkMetadata.setDataCrc(99999L);
+ zkMetadata.setUseDataCrc(true);
+
+ ImmutableSegmentDataManager segmentDataManager =
createImmutableSegmentDataManager(SEGMENT_NAME, segmentCrc);
+ SegmentMetadata segmentMetadata =
segmentDataManager.getSegment().getSegmentMetadata();
+ when(segmentMetadata.getDataCrc()).thenReturn("11111");
+
+ BaseTableDataManager tableDataManager = createTableManager();
+ File dataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME);
+
+ assertTrue(dataDir.mkdirs());
+
+ // Should NOT download because segment CRCs match
+ tableDataManager.replaceSegmentIfCrcMismatch(segmentDataManager,
zkMetadata, new IndexLoadingConfig());
+ }
+
+ @Test
+ public void
testReplaceSegmentIfCrcMismatchWhenFlagEnabledAndSegmentCrcMismatchWithInvalidZkDataCrc()
+ throws Exception {
+ // When ZK data CRC is invalid (-1), should download if segment CRCs don't
match
+ SegmentZKMetadata zkMetadata = createRawSegment(SegmentVersion.v3, 5);
+ zkMetadata.setCrc(2048L);
+ zkMetadata.setDataCrc(-1L);
+ zkMetadata.setUseDataCrc(true);
+
+ ImmutableSegmentDataManager segmentDataManager =
createImmutableSegmentDataManager(SEGMENT_NAME, 1024L);
+ SegmentMetadata segmentMetadata =
segmentDataManager.getSegment().getSegmentMetadata();
+ when(segmentMetadata.getDataCrc()).thenReturn("99999");
+
+ BaseTableDataManager tableDataManager = createTableManager();
+ File dataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME);
+ assertFalse(dataDir.exists());
+
+ // Should download because ZK data CRC is invalid
+ tableDataManager.replaceSegmentIfCrcMismatch(segmentDataManager,
zkMetadata, new IndexLoadingConfig());
+
+ assertTrue(dataDir.exists());
+ assertEquals(new SegmentMetadataImpl(dataDir).getTotalDocs(), 5);
+ }
+
// Has to be public class for the class loader to work.
public static class FakePinotCrypter implements PinotCrypter {
private File _origFile;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 9888e32cc1d..8a71c568288 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1811,6 +1811,7 @@ public class CommonConstants {
public static final String TOTAL_DOCS = "segment.total.docs";
public static final String CRC = "segment.crc";
public static final String DATA_CRC = "segment.data.crc";
+ public static final String USE_DATA_CRC = "segment.use.data.crc";
public static final String TIER = "segment.tier";
public static final String CREATION_TIME = "segment.creation.time";
public static final String PUSH_TIME = "segment.push.time";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]