This is an automated email from the ASF dual-hosted git repository.

KKcorps pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d0ead2f626c Skip local build on CRC mismatch during CONSUMING->ONLINE 
transition (#18895)
d0ead2f626c is described below

commit d0ead2f626c27034b9b0316721c96ac5c5367ddb
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Thu Jul 2 20:13:30 2026 -0700

    Skip local build on CRC mismatch during CONSUMING->ONLINE transition 
(#18895)
---
 .../realtime/RealtimeSegmentDataManager.java       |  48 +++++-
 .../realtime/RealtimeSegmentDataManagerTest.java   | 185 ++++++++++++++++++++-
 2 files changed, 229 insertions(+), 4 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 0fbe75597d4..b060f2448a5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -52,6 +52,7 @@ import 
org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.PauselessConsumptionUtils;
 import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.core.data.manager.BaseTableDataManager;
 import org.apache.pinot.core.data.manager.SegmentOperationsTaskContext;
 import org.apache.pinot.core.data.manager.SegmentOperationsTaskType;
 import 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter;
@@ -72,6 +73,7 @@ import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
 import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
@@ -1396,6 +1398,21 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
 
   protected boolean buildSegmentAndReplace()
       throws Exception {
+    return buildSegmentAndReplace(null);
+  }
+
+  /// Builds the segment from the in-memory rows and replaces the CONSUMING 
segment with the local copy.
+  ///
+  /// A locally-built segment can diverge in CRC from the one the winning 
replica committed (e.g. different docId
+  /// ordering); swapping in a divergent copy corrupts upsert metadata and 
leaves replicas inconsistent otherwise. So
+  /// when `committedSegmentZKMetadata` carries a CRC, a mismatch discards the 
local build (returns `false`) and the
+  /// caller downloads the committed segment. Skipped for pauseless tables and 
segments whose CRC is not yet set (the
+  /// COMMITTING window), so we never download a not-yet-uploaded segment (see 
PR #17885).
+  ///
+  /// @param committedSegmentZKMetadata committed ZK metadata carrying the 
CRC, or `null` to skip the check
+  /// @return `true` if built and replaced locally; `false` if the build 
failed or was rejected on a CRC mismatch
+  protected boolean buildSegmentAndReplace(@Nullable SegmentZKMetadata 
committedSegmentZKMetadata)
+      throws Exception {
     SegmentBuildDescriptor descriptor;
     try {
       descriptor = buildSegmentInternal(false);
@@ -1405,10 +1422,34 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     if (descriptor == null) {
       return false;
     }
-    _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr, 
_segmentZKMetadata);
+    boolean crcCheckEnabled = committedSegmentZKMetadata != null && 
committedSegmentZKMetadata.getCrc() >= 0
+        && !PauselessConsumptionUtils.isPauselessEnabled(_tableConfig);
+    if (crcCheckEnabled && 
!isLocalSegmentCrcMatchingZk(committedSegmentZKMetadata)) {
+      _segmentLogger.warn("Locally-built segment: {} CRC does not match 
committed CRC: {} in zk. "
+          + "Skipping local build to replace", _segmentNameStr, 
committedSegmentZKMetadata.getCrc());
+      return false;
+    }
+    // On a CRC match use the committed metadata; otherwise the 
construction-time metadata.
+    _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr,
+        crcCheckEnabled ? committedSegmentZKMetadata : _segmentZKMetadata);
     return true;
   }
 
+  /// Whether the just-built local segment's CRC matches the committed CRC in 
ZK. Returns `false` if the local metadata
+  /// cannot be read, so the caller downloads the committed segment rather 
than failing the transition.
+  @VisibleForTesting
+  protected boolean isLocalSegmentCrcMatchingZk(SegmentZKMetadata 
committedSegmentZKMetadata) {
+    File indexDir = new File(_resourceDataDir, _segmentNameStr);
+    try {
+      SegmentMetadataImpl localMetadata = new SegmentMetadataImpl(indexDir);
+      return BaseTableDataManager.hasSameCRC(committedSegmentZKMetadata, 
localMetadata);
+    } catch (Exception e) {
+      _segmentLogger.warn("Failed to read CRC of locally-built segment: {}; 
treating as CRC mismatch to download the "
+          + "committed segment", _segmentNameStr, e);
+      return false;
+    }
+  }
+
   private void closeStreamConsumer() {
     if (_streamConsumerClosed.compareAndSet(false, true)) {
       closePartitionGroupConsumer();
@@ -1609,7 +1650,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           } else if (_currentOffset.compareTo(endOffset) == 0) {
             _segmentLogger.info("Current offset {} matches offset in zk {}. 
Replacing segment", _currentOffset,
                 endOffset);
-            if (!buildSegmentAndReplace()) {
+            if (!buildSegmentAndReplace(segmentZKMetadata)) {
               _segmentLogger.warn("Failed to build the segment: {} and 
replace. Downloading to replace",
                   _segmentNameStr);
               downloadSegmentAndReplace(segmentZKMetadata);
@@ -1630,7 +1671,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
 
             if (success) {
               _segmentLogger.info("Caught up to offset {}", _currentOffset);
-              if (!buildSegmentAndReplace()) {
+              // After catching up the offset matches zk; apply the same CRC 
guard as the matched-offset case.
+              if (!buildSegmentAndReplace(segmentZKMetadata)) {
                 _segmentLogger.warn("Failed to build the segment: {} after 
catchup. Downloading to replace",
                     _segmentNameStr);
                 downloadSegmentAndReplace(segmentZKMetadata);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 41fb745acd4..92a6dad4763 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -26,6 +26,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.time.Instant;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -48,13 +49,19 @@ import 
org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.segment.local.segment.creator.Fixtures;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.stream.LongMsgOffset;
@@ -70,6 +77,9 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.mock;
@@ -600,6 +610,159 @@ public class RealtimeSegmentDataManagerTest {
     }
   }
 
+  @Test
+  public void testOnlineTransitionSkipsLocalBuildOnCrcMismatch()
+      throws Exception {
+    long finalOffsetValue = START_OFFSET_VALUE + 600;
+
+    // Upsert + committed CRC set + local CRC mismatch -> discard local build, 
download the committed segment.
+    try (FakeRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager()) {
+      
when(segmentDataManager._tableDataManager.isUpsertEnabled()).thenReturn(true);
+      runGoOnlineForCrcGuard(segmentDataManager, crcMetadata(finalOffsetValue, 
12345L), false, finalOffsetValue);
+      Assert.assertTrue(segmentDataManager._buildAndReplaceCalled);
+      Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled);
+    }
+
+    // Non-upsert + committed CRC set + local CRC mismatch -> discard local 
build, download the committed segment.
+    try (FakeRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager()) {
+      
when(segmentDataManager._tableDataManager.isUpsertEnabled()).thenReturn(false);
+      runGoOnlineForCrcGuard(segmentDataManager, crcMetadata(finalOffsetValue, 
12345L), false, finalOffsetValue);
+      Assert.assertTrue(segmentDataManager._buildAndReplaceCalled);
+      Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled);
+    }
+
+    // Upsert + committed CRC set + local CRC match -> build locally, no 
download, swap in committed metadata.
+    try (FakeRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager()) {
+      
when(segmentDataManager._tableDataManager.isUpsertEnabled()).thenReturn(true);
+      SegmentZKMetadata committedMetadata = crcMetadata(finalOffsetValue, 
12345L);
+      runGoOnlineForCrcGuard(segmentDataManager, committedMetadata, true, 
finalOffsetValue);
+      Assert.assertTrue(segmentDataManager._buildAndReplaceCalled);
+      Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
+      verify(segmentDataManager._tableDataManager)
+          .replaceConsumingSegment(eq(SEGMENT_NAME_STR), 
same(committedMetadata));
+    }
+
+    // Non-upsert + committed CRC set + local CRC match -> build locally, no 
download, swap in committed metadata.
+    try (FakeRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager()) {
+      
when(segmentDataManager._tableDataManager.isUpsertEnabled()).thenReturn(false);
+      SegmentZKMetadata committedMetadata = crcMetadata(finalOffsetValue, 
12345L);
+      runGoOnlineForCrcGuard(segmentDataManager, committedMetadata, true, 
finalOffsetValue);
+      Assert.assertTrue(segmentDataManager._buildAndReplaceCalled);
+      Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
+      verify(segmentDataManager._tableDataManager)
+          .replaceConsumingSegment(eq(SEGMENT_NAME_STR), 
same(committedMetadata));
+    }
+
+    // Committed CRC unset (-1) -> guard skipped, build locally with 
construction-time metadata.
+    try (FakeRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager()) {
+      
when(segmentDataManager._tableDataManager.isUpsertEnabled()).thenReturn(false);
+      SegmentZKMetadata committedMetadata = crcMetadata(finalOffsetValue, -1L);
+      runGoOnlineForCrcGuard(segmentDataManager, committedMetadata, false, 
finalOffsetValue);
+      Assert.assertTrue(segmentDataManager._buildAndReplaceCalled);
+      Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
+      verify(segmentDataManager._tableDataManager)
+          .replaceConsumingSegment(eq(SEGMENT_NAME_STR), argThat(m -> m != 
committedMetadata));
+    }
+
+    // Pauseless table -> guard skipped, build locally with construction-time 
metadata.
+    try (FakeRealtimeSegmentDataManager segmentDataManager =
+        createFakeSegmentManager(false, new TimeSupplier(), null, null, 
createPauselessTableConfig())) {
+      
when(segmentDataManager._tableDataManager.isUpsertEnabled()).thenReturn(false);
+      SegmentZKMetadata committedMetadata = crcMetadata(finalOffsetValue, 
12345L);
+      runGoOnlineForCrcGuard(segmentDataManager, committedMetadata, false, 
finalOffsetValue);
+      Assert.assertTrue(segmentDataManager._buildAndReplaceCalled);
+      Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
+      verify(segmentDataManager._tableDataManager)
+          .replaceConsumingSegment(eq(SEGMENT_NAME_STR), argThat(m -> m != 
committedMetadata));
+    }
+  }
+
+  private void runGoOnlineForCrcGuard(FakeRealtimeSegmentDataManager 
segmentDataManager, SegmentZKMetadata metadata,
+      boolean localCrcMatchesZk, long currentOffsetValue)
+      throws Exception {
+    segmentDataManager._useRealBuildAndReplace = true;
+    segmentDataManager._localSegmentCrcMatchesZk = localCrcMatchesZk;
+    segmentDataManager.getConsumerSemaphoreAcquired().set(true);
+    segmentDataManager._stopWaitTimeMs = 0;
+    segmentDataManager._state.set(segmentDataManager, 
RealtimeSegmentDataManager.State.HOLDING);
+    segmentDataManager.setCurrentOffset(currentOffsetValue);
+    segmentDataManager.goOnlineFromConsuming(metadata);
+  }
+
+  private SegmentZKMetadata crcMetadata(long endOffsetValue, long crc) {
+    SegmentZKMetadata metadata = new SegmentZKMetadata(SEGMENT_NAME_STR);
+    metadata.setEndOffset(new LongMsgOffset(endOffsetValue).toString());
+    if (crc >= 0) {
+      metadata.setCrc(crc);
+    }
+    return metadata;
+  }
+
+  private TableConfig createPauselessTableConfig()
+      throws Exception {
+    TableConfig tableConfig = createTableConfig();
+    StreamIngestionConfig streamIngestionConfig =
+        new 
StreamIngestionConfig(List.of(tableConfig.getIndexingConfig().getStreamConfigs()));
+    streamIngestionConfig.setPauselessConsumptionEnabled(true);
+    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+    if (ingestionConfig == null) {
+      ingestionConfig = new IngestionConfig();
+      tableConfig.setIngestionConfig(ingestionConfig);
+    }
+    ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
+    return tableConfig;
+  }
+
+  // Exercises the real CRC comparison (on-disk SegmentMetadataImpl read + 
hasSameCRC) rather than the stubbed branch.
+  @Test
+  public void testIsLocalSegmentCrcMatchingZk()
+      throws Exception {
+    try (FakeRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager()) {
+      segmentDataManager._useRealCrcCheck = true;
+      File resourceDir = new File(TEMP_DIR, REALTIME_TABLE_NAME);
+      long localCrc = buildRealSegmentAndGetCrc(resourceDir, SEGMENT_NAME_STR);
+
+      SegmentZKMetadata matchingMetadata = new 
SegmentZKMetadata(SEGMENT_NAME_STR);
+      matchingMetadata.setCrc(localCrc);
+      
Assert.assertTrue(segmentDataManager.isLocalSegmentCrcMatchingZk(matchingMetadata));
+
+      SegmentZKMetadata mismatchingMetadata = new 
SegmentZKMetadata(SEGMENT_NAME_STR);
+      mismatchingMetadata.setCrc(localCrc + 1);
+      
Assert.assertFalse(segmentDataManager.isLocalSegmentCrcMatchingZk(mismatchingMetadata));
+    }
+  }
+
+  // When the locally-built segment cannot be read (e.g. missing/corrupt), the 
CRC check must report a mismatch so the
+  // ONLINE transition downloads the committed segment instead of throwing.
+  @Test
+  public void 
testIsLocalSegmentCrcMatchingZkTreatsUnreadableSegmentAsMismatch()
+      throws Exception {
+    try (FakeRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager()) {
+      segmentDataManager._useRealCrcCheck = true;
+      // No segment was built under the resource dir, so the metadata read 
fails.
+      FileUtils.deleteQuietly(new File(new File(TEMP_DIR, 
REALTIME_TABLE_NAME), SEGMENT_NAME_STR));
+      SegmentZKMetadata metadata = new SegmentZKMetadata(SEGMENT_NAME_STR);
+      metadata.setCrc(12345L);
+      
Assert.assertFalse(segmentDataManager.isLocalSegmentCrcMatchingZk(metadata));
+    }
+  }
+
+  private long buildRealSegmentAndGetCrc(File resourceDir, String segmentName)
+      throws Exception {
+    Schema schema = Fixtures.createSchema();
+    TableConfig tableConfig = createTableConfig();
+    SegmentGeneratorConfig generatorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
+    generatorConfig.setOutDir(resourceDir.getAbsolutePath());
+    generatorConfig.setSegmentName(segmentName);
+    List<GenericRow> rows = List.of(Fixtures.createSingleRow(1L), 
Fixtures.createSingleRow(2L));
+    try (GenericRowRecordReader recordReader = new 
GenericRowRecordReader(rows)) {
+      SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+      driver.init(generatorConfig, recordReader);
+      driver.build();
+    }
+    return Long.parseLong(new SegmentMetadataImpl(new File(resourceDir, 
segmentName)).getCrc());
+  }
+
   @Test
   public void testEndCriteriaChecking()
       throws Exception {
@@ -1166,6 +1329,12 @@ public class RealtimeSegmentDataManagerTest {
     public Field _stopReason;
     public Field _segmentBuildFailedWithDeterministicError;
     public boolean _failSegmentBuildAndReplace = false;
+    // When set, buildSegmentAndReplace runs the real implementation 
(including the upsert CRC guard) instead of being
+    // short-circuited, and isLocalSegmentCrcMatchingZk returns 
_localSegmentCrcMatchesZk.
+    public boolean _useRealBuildAndReplace = false;
+    public boolean _localSegmentCrcMatchesZk = true;
+    // When set, isLocalSegmentCrcMatchingZk runs the real on-disk metadata 
read + CRC comparison.
+    public boolean _useRealCrcCheck = false;
     private Field _streamMsgOffsetFactory;
     public LinkedList<LongMsgOffset> _consumeOffsets = new LinkedList<>();
     public LinkedList<SegmentCompletionProtocol.Response> _responses = new 
LinkedList<>();
@@ -1180,6 +1349,7 @@ public class RealtimeSegmentDataManagerTest {
     public boolean _postConsumeStoppedCalled = false;
     public Map<Integer, ConsumerCoordinator> _consumerCoordinatorMap;
     public boolean _stubConsumeLoop = true;
+    public RealtimeTableDataManager _tableDataManager;
     private TimeSupplier _timeSupplier;
     private boolean _indexCapacityThresholdBreached;
 
@@ -1216,6 +1386,7 @@ public class RealtimeSegmentDataManagerTest {
       _streamMsgOffsetFactory.setAccessible(true);
       _streamMsgOffsetFactory.set(this, new LongMsgOffsetFactory());
       _timeSupplier = timeSupplier;
+      _tableDataManager = realtimeTableDataManager;
     }
 
     public String getStopReason() {
@@ -1317,12 +1488,24 @@ public class RealtimeSegmentDataManagerTest {
     }
 
     @Override
-    protected boolean buildSegmentAndReplace() {
+    protected boolean buildSegmentAndReplace(SegmentZKMetadata 
committedSegmentZKMetadata)
+        throws Exception {
       terminateLoopIfNecessary();
       _buildAndReplaceCalled = true;
+      if (_useRealBuildAndReplace) {
+        return super.buildSegmentAndReplace(committedSegmentZKMetadata);
+      }
       return !_failSegmentBuildAndReplace;
     }
 
+    @Override
+    protected boolean isLocalSegmentCrcMatchingZk(SegmentZKMetadata 
committedSegmentZKMetadata) {
+      if (_useRealCrcCheck) {
+        return super.isLocalSegmentCrcMatchingZk(committedSegmentZKMetadata);
+      }
+      return _localSegmentCrcMatchesZk;
+    }
+
     @Override
     protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
       terminateLoopIfNecessary();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to