This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ea4d1f3ebc Deterministic data-only CRC (2/2) - use data-only CRC for 
realtime committing segments during replace (#17380)
0ea4d1f3ebc is described below

commit 0ea4d1f3ebca614b3bca98d1974a91cc1fb0f5e4
Author: Anurag Rai <[email protected]>
AuthorDate: Thu Jan 15 02:25:13 2026 +0530

    Deterministic data-only CRC (2/2) - use data-only CRC for realtime 
committing segments during replace (#17380)
---
 .../common/metadata/segment/SegmentZKMetadata.java | 16 +++++
 .../metadata/segment/SegmentZKMetadataUtils.java   |  8 ++-
 .../core/data/manager/BaseTableDataManager.java    | 10 +++-
 .../data/manager/BaseTableDataManagerTest.java     | 70 ++++++++++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |  1 +
 5 files changed, 103 insertions(+), 2 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
index 8938cd30809..730994cab78 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
@@ -182,6 +182,22 @@ public class SegmentZKMetadata implements ZKMetadata {
     setNonNegativeValue(Segment.DATA_CRC, dataCrc);
   }
 
+  public boolean isUseDataCrc() {
+    String useDataCrcString = _simpleFields.get(Segment.USE_DATA_CRC);
+    return Boolean.parseBoolean(useDataCrcString);
+  }
+
+  // useDataCrc is set for consuming segments in realtime table
+  // that signal replica server to use Data CRC when available for doing any 
replacement
+  // of segments
+  public void setUseDataCrc(boolean useDataCrc) {
+    if (useDataCrc) {
+      _simpleFields.put(Segment.USE_DATA_CRC, "true");
+    } else {
+      _simpleFields.remove(Segment.USE_DATA_CRC);
+    }
+  }
+
   public String getTier() {
     return _simpleFields.get(Segment.TIER);
   }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java
index 0c72fcd7600..9df510b3d54 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java
@@ -94,6 +94,10 @@ public class SegmentZKMetadataUtils {
 
       segmentZKMetadata.setEndOffset(endOffset);
       
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+      // for committing segments, we use data CRC to replace but only if the 
data CRC is present for the segment
+      if (Long.parseLong(segmentMetadata.getDataCrc()) >= 0) {
+        segmentZKMetadata.setUseDataCrc(true);
+      }
 
       // For committing segment, use current time as start/end time if total 
docs is 0
       if (segmentMetadata.getTotalDocs() > 0) {
@@ -110,7 +114,9 @@ public class SegmentZKMetadataUtils {
       segmentZKMetadata.setTimeUnit(TimeUnit.MILLISECONDS);
     } else {
       // For uploaded segment
-
+      // clear the use data crc flag for uploaded segments
+      // This flag should ONLY be true for segments committed from CONSUMING 
state
+      segmentZKMetadata.setUseDataCrc(false);
       // Set segment status, start/end offset info for real-time table
       if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
         
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 29aadee1aa5..5694a9aaecc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -1659,8 +1659,16 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     return segmentDirectoryLoader.load(indexDir.toURI(), loaderContext);
   }
 
+  // CRC check can be performed on both segment CRC and data CRC (if 
available) based on the ZK property value of
+  // useDataCRC.
   private static boolean hasSameCRC(SegmentZKMetadata zkMetadata, 
SegmentMetadata localMetadata) {
-    return zkMetadata.getCrc() == Long.parseLong(localMetadata.getCrc());
+    if (zkMetadata.getCrc() == Long.parseLong(localMetadata.getCrc())) {
+      return true;
+    }
+    return zkMetadata.isUseDataCrc()
+            && zkMetadata.getDataCrc() >= 0
+            && Long.parseLong(localMetadata.getDataCrc()) >= 0
+            && zkMetadata.getDataCrc() == 
Long.parseLong(localMetadata.getDataCrc());
   }
 
   private static void recoverReloadFailureQuietly(String tableNameWithType, 
String segmentName, File indexDir) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 034cea027b9..cf7053e985e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -802,6 +802,76 @@ public class BaseTableDataManagerTest {
     }
   }
 
+  @Test
+  public void 
testReplaceSegmentIfCrcMismatchWhenFlagDisabledSegmentCrcMismatchShouldDownload()
+      throws Exception {
+    // When flag is disabled and segment CRCs don't match, should download
+    SegmentZKMetadata zkMetadata = createRawSegment(SegmentVersion.v3, 5);
+    zkMetadata.setCrc(2048L); // Different from local
+    zkMetadata.setDataCrc(99999L);
+    zkMetadata.setUseDataCrc(false);
+
+    ImmutableSegmentDataManager segmentDataManager = 
createImmutableSegmentDataManager(SEGMENT_NAME, 1024L);
+    SegmentMetadata segmentMetadata = 
segmentDataManager.getSegment().getSegmentMetadata();
+    when(segmentMetadata.getDataCrc()).thenReturn("99999");
+
+    BaseTableDataManager tableDataManager = createTableManager();
+    File dataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME);
+    assertFalse(dataDir.exists());
+
+    // Should download because segment CRCs don't match (ignores data CRC)
+    tableDataManager.replaceSegmentIfCrcMismatch(segmentDataManager, 
zkMetadata, new IndexLoadingConfig());
+
+    assertTrue(dataDir.exists());
+    assertEquals(new SegmentMetadataImpl(dataDir).getTotalDocs(), 5);
+  }
+
+  @Test
+  public void 
testReplaceSegmentIfCrcMismatchWhenFlagEnabledAndSegmentCrcMatchShouldNoDownload()
+      throws Exception {
+    // When flag is enabled and segment CRCs match, should not download
+    SegmentZKMetadata zkMetadata = createRawSegment(SegmentVersion.v3, 5);
+    long segmentCrc = zkMetadata.getCrc();
+    zkMetadata.setDataCrc(99999L);
+    zkMetadata.setUseDataCrc(true);
+
+    ImmutableSegmentDataManager segmentDataManager = 
createImmutableSegmentDataManager(SEGMENT_NAME, segmentCrc);
+    SegmentMetadata segmentMetadata = 
segmentDataManager.getSegment().getSegmentMetadata();
+    when(segmentMetadata.getDataCrc()).thenReturn("11111");
+
+    BaseTableDataManager tableDataManager = createTableManager();
+    File dataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME);
+
+    assertTrue(dataDir.mkdirs());
+
+    // Should NOT download because segment CRCs match
+    tableDataManager.replaceSegmentIfCrcMismatch(segmentDataManager, 
zkMetadata, new IndexLoadingConfig());
+  }
+
+  @Test
+  public void 
testReplaceSegmentIfCrcMismatchWhenFlagEnabledAndSegmentCrcMismatchWithInvalidZkDataCrc()
+      throws Exception {
+    // When ZK data CRC is invalid (-1), should download if segment CRCs don't 
match
+    SegmentZKMetadata zkMetadata = createRawSegment(SegmentVersion.v3, 5);
+    zkMetadata.setCrc(2048L);
+    zkMetadata.setDataCrc(-1L);
+    zkMetadata.setUseDataCrc(true);
+
+    ImmutableSegmentDataManager segmentDataManager = 
createImmutableSegmentDataManager(SEGMENT_NAME, 1024L);
+    SegmentMetadata segmentMetadata = 
segmentDataManager.getSegment().getSegmentMetadata();
+    when(segmentMetadata.getDataCrc()).thenReturn("99999");
+
+    BaseTableDataManager tableDataManager = createTableManager();
+    File dataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME);
+    assertFalse(dataDir.exists());
+
+    // Should download because ZK data CRC is invalid
+    tableDataManager.replaceSegmentIfCrcMismatch(segmentDataManager, 
zkMetadata, new IndexLoadingConfig());
+
+    assertTrue(dataDir.exists());
+    assertEquals(new SegmentMetadataImpl(dataDir).getTotalDocs(), 5);
+  }
+
   // Has to be public class for the class loader to work.
   public static class FakePinotCrypter implements PinotCrypter {
     private File _origFile;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 9888e32cc1d..8a71c568288 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1811,6 +1811,7 @@ public class CommonConstants {
     public static final String TOTAL_DOCS = "segment.total.docs";
     public static final String CRC = "segment.crc";
     public static final String DATA_CRC = "segment.data.crc";
+    public static final String USE_DATA_CRC = "segment.use.data.crc";
     public static final String TIER = "segment.tier";
     public static final String CREATION_TIME = "segment.creation.time";
     public static final String PUSH_TIME = "segment.push.time";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to