Copilot commented on code in PR #17324:
URL: https://github.com/apache/pinot/pull/17324#discussion_r2654806981


##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -83,18 +95,38 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
   private static final String DELETE_RECORD_COLUMN = "deleteCol";
   private static final File INDEX_DIR =
       new File(FileUtils.getTempDirectory(), 
"ConcurrentMapPartitionUpsertMetadataManagerTest");
+  private static final File SEGMENT_DIR = new File(INDEX_DIR, "segments");
+
+  private static final int MOCK_FALLBACK_BASE_OFFSET = 1000;

Review Comment:
   The constant name `MOCK_FALLBACK_BASE_OFFSET` is ambiguous. Consider 
renaming it to `MOCK_DEFAULT_PRIMARY_KEY_BASE` or 
`MOCK_FALLBACK_PRIMARY_KEY_OFFSET` to clarify that it's used as a fallback for 
generating primary key values in mock scenarios.
   ```suggestion
     private static final int MOCK_FALLBACK_PRIMARY_KEY_OFFSET = 1000;
   ```



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -808,26 +851,127 @@ private List<PrimaryKey> getPrimaryKeyList(int 
numRecords, int[] primaryKeys) {
   private static ImmutableSegmentImpl mockImmutableSegment(int sequenceNumber,
       ThreadSafeMutableRoaringBitmap validDocIds, @Nullable 
ThreadSafeMutableRoaringBitmap queryableDocIds,
       List<PrimaryKey> primaryKeys) {
+    return mockImmutableSegmentWithTimestamps(sequenceNumber, validDocIds, 
queryableDocIds, primaryKeys, null);
+  }
+
+  private static ImmutableSegmentImpl mockImmutableSegmentWithTimestamps(int 
sequenceNumber,
+      ThreadSafeMutableRoaringBitmap validDocIds, @Nullable 
ThreadSafeMutableRoaringBitmap queryableDocIds,
+      List<PrimaryKey> primaryKeys, @Nullable int[] timestamps) {
     ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
     when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
     when(segment.getValidDocIds()).thenReturn(validDocIds);
     when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
-    DataSource dataSource = mock(DataSource.class);
-    when(segment.getDataSource(anyString())).thenReturn(dataSource);
-    ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
-    when(forwardIndex.isSingleValue()).thenReturn(true);
-    when(forwardIndex.getStoredType()).thenReturn(DataType.INT);
-    when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
-        invocation -> 
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
-    when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+
+    // Enhanced mocking for RecordInfoReader to work properly
+    // Mock primary key column data source
+    DataSource primaryKeyDataSource = mock(DataSource.class);
+    ForwardIndexReader primaryKeyForwardIndex = mock(ForwardIndexReader.class);
+    when(primaryKeyForwardIndex.isSingleValue()).thenReturn(true);
+    when(primaryKeyForwardIndex.getStoredType()).thenReturn(DataType.INT);
+    when(primaryKeyForwardIndex.createContext()).thenReturn(null);
+    when(primaryKeyForwardIndex.getInt(anyInt(), any())).thenAnswer(invocation 
-> {
+      int docId = invocation.getArgument(0);
+      if (primaryKeys != null && docId < primaryKeys.size()) {
+        return (Integer) primaryKeys.get(docId).getValues()[0];
+      }
+      return MOCK_FALLBACK_BASE_OFFSET + docId;
+    });
+    
when(primaryKeyDataSource.getForwardIndex()).thenReturn(primaryKeyForwardIndex);
+
+    // Mock comparison column data source
+    DataSource comparisonDataSource = mock(DataSource.class);
+    ForwardIndexReader comparisonForwardIndex = mock(ForwardIndexReader.class);
+    when(comparisonForwardIndex.isSingleValue()).thenReturn(true);
+    when(comparisonForwardIndex.getStoredType()).thenReturn(DataType.INT);
+    when(comparisonForwardIndex.createContext()).thenReturn(null);
+    when(comparisonForwardIndex.getInt(anyInt(), any())).thenAnswer(invocation 
-> {
+      int docId = invocation.getArgument(0);
+      // Return actual timestamp values if provided, otherwise default values
+      if (timestamps != null && docId < timestamps.length) {
+        return timestamps[docId];
+      }
+      return MOCK_FALLBACK_BASE_OFFSET + (docId * 100);
+    });
+    
when(comparisonDataSource.getForwardIndex()).thenReturn(comparisonForwardIndex);
+
+    // Set up data source mapping - IMPORTANT: anyString() must be registered 
FIRST,
+    // then specific matchers override it (Mockito uses last matching stub)

Review Comment:
   This comment incorrectly states 'Mockito uses last matching stub'. Mockito 
actually uses the first matching stub in order of registration. The code 
behavior is correct (anyString() fallback, then specific overrides), but the 
explanation should be clarified to say 'Mockito uses the most specific matching 
stub' or 'specific matchers take precedence'.
   ```suggestion
       // Set up data source mapping - use a generic anyString() fallback,
       // then more specific matchers override it (Mockito uses the most 
specific matching stub)
   ```



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -1724,4 +1903,414 @@ public int hashCode() {
       return _value;
     }
   }
+
+  // Tests for upsert metadata reversion functionality
+  @Test
+  public void testPartialUpsertSameDocsReplacement() throws IOException {
+    // Test partial upserts with old and new segments having same number of 
docs
+    // This test verifies that when all keys are present, no reversion occurs
+    PartialUpsertHandler mockPartialUpsertHandler = 
mock(PartialUpsertHandler.class);
+    UpsertContext upsertContext = 
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+        .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    int numRecords1 = 3;
+    int[] primaryKeys1 = new int[]{1, 2, 3};
+    int[] timestamps1 = new int[]{100, 200, 300};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords1; i++) {
+      validDocIds1.add(i);
+    }
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1, 
validDocIds1, null,
+        primaryKeysList1, timestamps1);
+    List<RecordInfo> recordInfoList1 = 
getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
+        timestamps1, null);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 1, segment1, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 1, 200, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, segment1, 2, 300, 
HashFunction.NONE);
+
+    // Create new segment with same 3 records but updated timestamps
+    int numRecords2 = 3;
+    int[] primaryKeys2 = new int[]{1, 2, 3};
+    int[] timestamps2 = new int[]{150, 250, 350};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords2; i++) {
+      validDocIds2.add(i);
+    }
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
+        primaryKeysList2, timestamps2);
+    List<RecordInfo> recordInfoList2 = 
getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2,
+        timestamps2, null);
+
+    // Replace segment - should trigger reversion logic but no reversion 
needed since all keys are present
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+
+    // Verify replacement - all records should be in new segment
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 1, segment2, 0, 150, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 250, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, segment2, 2, 350, 
HashFunction.NONE);
+
+    // New segment should have all docs valid
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 3);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testPartialUpsertOldSegmentTriggerReversion() throws IOException 
{
+    // Test partial upserts with consuming (mutable) segment being sealed - 
revert should be triggered
+    // Note: Revert logic only applies when sealing a consuming segment, not 
for immutable segment replacement
+    PartialUpsertHandler mockPartialUpsertHandler = 
mock(PartialUpsertHandler.class);
+    UpsertContext upsertContext = 
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+        .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    // Create a mutable (consuming) segment with 4 records - use 
mockMutableSegmentWithDataSource
+    // to support removeSegment which needs to read primary keys
+    int[] mutablePrimaryKeys = new int[]{1, 2, 3, 4};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1, 
validDocIds1, null, mutablePrimaryKeys);
+
+    // Add records to the mutable segment
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(1), 0, 100, false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(2), 1, 200, false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(3), 2, 300, false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(4), 3, 400, false));
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 4);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().getCardinality(), 4);
+
+    int numRecords2 = 2;
+    int[] primaryKeys2 = new int[]{1, 3};
+    int[] timestamps2 = new int[]{150, 350};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
+        primaryKeysList2, timestamps2);
+
+    // Replace mutable with immutable (consuming segment seal) - revert SHOULD 
be triggered
+    upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
+        getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2, 
timestamps2, null).iterator(),
+        mutableSegment);
+
+    assertEquals(recordLocationMap.size(), 2);
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 2);
+    checkRecordLocation(recordLocationMap, 1, segment2, 0, 150, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, segment2, 1, 350, 
HashFunction.NONE);
+
+    // Mutable segment's validDocIds should be 0 after removal
+    assertEquals(validDocIds1.getMutableRoaringBitmap().getCardinality(), 0);
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testPartialUpsertOldSegmentLesserDocs() throws IOException {
+    // Test partial upserts with old segment having fewer docs than new segment
+    PartialUpsertHandler mockPartialUpsertHandler = 
mock(PartialUpsertHandler.class);
+    UpsertContext upsertContext = 
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+        .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    int numRecords1 = 2;
+    int[] primaryKeys1 = new int[]{1, 2};
+    int[] timestamps1 = new int[]{100, 200};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords1; i++) {
+      validDocIds1.add(i);
+    }
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithTimestamps(1, validDocIds1, null, 
primaryKeysList1, timestamps1);
+    List<RecordInfo> recordInfoList1 =
+        getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1, 
timestamps1, null);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 2);
+
+    int numRecords2 = 4;
+    int[] primaryKeys2 = new int[]{1, 2, 3, 4};
+    int[] timestamps2 = new int[]{150, 250, 300, 400};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords2; i++) {
+      validDocIds2.add(i);
+    }
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
+        primaryKeysList2, timestamps2);
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+
+    // Verify state after replacement - all records should be in new segment
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 1, segment2, 0, 150, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 250, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, segment2, 2, 300, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 4, segment2, 3, 400, 
HashFunction.NONE);
+
+    // New segment should have all docs valid
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 4);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testFullUpsertConsistencyNoneSameDocs() throws IOException {
+    // Test full upserts with consistency=NONE and same number of docs
+    UpsertContext upsertContext = _contextBuilder
+        .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE)
+        .setDropOutOfOrderRecord(true)
+        .build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    // Create old segment with 3 records
+    int numRecords1 = 3;
+    int[] primaryKeys1 = new int[]{10, 20, 30};
+    int[] timestamps1 = new int[]{1000, 2000, 3000};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords1; i++) {
+      validDocIds1.add(i);
+    }
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1, 
validDocIds1, null,
+        primaryKeysList1, timestamps1);
+    List<RecordInfo> recordInfoList1 = 
getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
+        timestamps1, null);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+
+    int numRecords2 = 3;
+    int[] primaryKeys2 = new int[]{10, 20, 30};
+    int[] timestamps2 = new int[]{1500, 2500, 3500};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords2; i++) {
+      validDocIds2.add(i);
+    }
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
+        primaryKeysList2, timestamps2);
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 20, segment2, 1, 2500, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 30, segment2, 2, 3500, 
HashFunction.NONE);
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 3);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testFullUpsertConsistencyNoneOldSegmentMoreDocs()
+      throws Exception {
+    // Test full upserts with consistency=NONE where old segment has more docs
+    // Using real segments instead of mocks to avoid complex data source 
mocking
+    UpsertContext upsertContext =
+        
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    String segmentName = "test_segment";
+
+    // Create first real segment with 3 records
+    int[] primaryKeys1 = new int[]{10, 30, 40};
+    int[] timestamps1 = new int[]{1500, 3500, 4000};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment1 = createRealSegment(segmentName, 
primaryKeys1, timestamps1, validDocIds1);
+
+    upsertMetadataManager.addSegment(segment1);
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+
+    // Create second real segment with 2 records (subset of first)
+    int[] primaryKeys2 = new int[]{10, 30};
+    int[] timestamps2 = new int[]{1500, 3500};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment2 = createRealSegment(segmentName, 
primaryKeys2, timestamps2, validDocIds2);
+
+    // Replace segment - RecordInfoReader will read real data from segment2
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+
+    assertEquals(recordLocationMap.size(), 2);
+    checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 30, segment2, 1, 3500, 
HashFunction.NONE);
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 2);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+
+    // Clean up real segments
+    segment1.destroy();
+    segment2.destroy();
+  }
+
+  @Test
+  public void testFullUpsertRegularConsistencyMode()
+      throws IOException {
+    // Test full upserts with regular consistency mode (not NONE) - no 
reversion should occur
+    UpsertContext upsertContext = 
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.SNAPSHOT).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    int numRecords1 = 3;
+    int[] primaryKeys1 = new int[]{100, 200, 300};
+    int[] timestamps1 = new int[]{10000, 20000, 30000};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords1; i++) {
+      validDocIds1.add(i);
+    }
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithTimestamps(1, validDocIds1, null, 
primaryKeysList1, timestamps1);
+    List<RecordInfo> recordInfoList1 =
+        getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1, 
timestamps1, null);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+    int numRecords2 = 1;
+    int[] primaryKeys2 = new int[]{100};
+    int[] timestamps2 = new int[]{15000};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords2; i++) {
+      validDocIds2.add(i);
+    }
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 =
+        mockImmutableSegmentWithTimestamps(1, validDocIds2, null, 
primaryKeysList2, timestamps2);
+
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+
+    assertEquals(recordLocationMap.size(), 1);
+    checkRecordLocation(recordLocationMap, 100, segment2, 0, 15000, 
HashFunction.NONE);
+
+    
assertEquals(segment1.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 2);
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 1);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testRevertOnlyAppliesForConsumingSegmentSeal()
+      throws IOException {
+    UpsertContext upsertContext =
+        
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    int[] mutablePrimaryKeys = new int[]{10, 20, 30};
+    ThreadSafeMutableRoaringBitmap validDocIdsMutable = new 
ThreadSafeMutableRoaringBitmap();
+    MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1, 
validDocIdsMutable, null, mutablePrimaryKeys);
+
+    upsertMetadataManager.addRecord(mutableSegment,
+        new RecordInfo(makePrimaryKey(10), 0, Integer.valueOf(1000), false));
+    upsertMetadataManager.addRecord(mutableSegment,
+        new RecordInfo(makePrimaryKey(20), 1, Integer.valueOf(2000), false));
+    upsertMetadataManager.addRecord(mutableSegment,
+        new RecordInfo(makePrimaryKey(30), 2, Integer.valueOf(3000), false));
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+    
assertEquals(validDocIdsMutable.getMutableRoaringBitmap().getCardinality(), 3);
+
+    int numRecords = 2;
+    int[] primaryKeys = new int[]{10, 20};
+    int[] timestamps = new int[]{1500, 2500};
+    ThreadSafeMutableRoaringBitmap validDocIdsImmutable = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeysList = getPrimaryKeyList(numRecords, 
primaryKeys);
+    ImmutableSegmentImpl immutableSegment = 
mockImmutableSegmentWithTimestamps(1, validDocIdsImmutable, null,
+        primaryKeysList, timestamps);
+
+    // This should trigger the revert logic since old segment is mutable
+    upsertMetadataManager.replaceSegment(immutableSegment, 
validDocIdsImmutable, null,
+        getRecordInfoListWithIntegerComparison(numRecords, primaryKeys, 
timestamps, null).iterator(), mutableSegment);
+
+    // After replacement, the records from immutable segment should be present
+    assertEquals(recordLocationMap.size(), 2);
+    checkRecordLocation(recordLocationMap, 10, immutableSegment, 0, 1500, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 20, immutableSegment, 1, 2500, 
HashFunction.NONE);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testNoRevertForImmutableSegmentReplacement()
+      throws IOException {
+    // Test that revert logic is NOT applied when replacing immutable segment 
with another immutable segment
+    UpsertContext upsertContext =
+        
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    // Create first immutable segment with 3 records
+    int numRecords1 = 3;
+    int[] primaryKeys1 = new int[]{10, 20, 30};
+    int[] timestamps1 = new int[]{1000, 2000, 3000};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords1; i++) {
+      validDocIds1.add(i);
+    }
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1, 
validDocIds1, null,
+        primaryKeysList1, timestamps1);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+        getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1, 
timestamps1, null).iterator());
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+
+    int numRecords2 = 1;
+    int[] primaryKeys2 = new int[]{10};
+    int[] timestamps2 = new int[]{1500};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords2; i++) {
+      validDocIds2.add(i);
+    }
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
+        primaryKeysList2, timestamps2);
+
+    long startTime = System.currentTimeMillis();
+    upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
+        getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2, 
timestamps2, null).iterator(), segment1);
+    long duration = System.currentTimeMillis() - startTime;
+
+    assertTrue(duration < 1000, "Immutable-to-immutable replacement should 
complete quickly, took: " + duration + "ms");

Review Comment:
   The hard-coded 1000ms timeout threshold may cause flaky test failures in 
slower CI environments or under load. Consider increasing this threshold or 
using a more flexible approach such as comparing execution time between revert 
and non-revert scenarios.
   ```suggestion
       assertTrue(duration < 30_000,
           "Immutable-to-immutable replacement should complete quickly (under 
30000ms), took: " + duration + "ms");
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -659,37 +665,94 @@ public void replaceSegment(ImmutableSegment segment, 
@Nullable ThreadSafeMutable
       validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
     }
     if (validDocIdsForOldSegment != null && 
!validDocIdsForOldSegment.isEmpty()) {
-      int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
-      // Add the new metric tracking here
-      if (_context.isDropOutOfOrderRecord() && _context.getConsistencyMode() 
== UpsertConfig.ConsistencyMode.NONE) {
-        // For Upsert tables when some of the records get dropped when 
dropOutOfOrderRecord is enabled, we donot
-        // store the original record location when keys are not replaced, this 
can potentially cause inconsistencies
-        // leading to some rows not getting dropped when reconsumed. This can 
be caused when a consuming segment
-        // that is consumed from a different server is replaced with the 
existing segment which consumed rows ahead
-        // of the other server
-        _logger.warn(
-            "Found {} primary keys not replaced when replacing segment: {} for 
upsert table with dropOutOfOrderRecord"
-                + " enabled with no consistency mode. This can potentially 
cause inconsistency between replicas",
-            numKeysNotReplaced, segmentName);
-        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
-            numKeysNotReplaced);
-      } else if (_partialUpsertHandler != null) {
-        // For partial-upsert table, because we do not restore the original 
record location when removing the primary
-        // keys not replaced, it can potentially cause inconsistency between 
replicas. This can happen when a
-        // consuming segment is replaced by a committed segment that is 
consumed from a different server with
-        // different records (some stream consumer cannot guarantee consuming 
the messages in the same order/
-        // when a segment is replaced with lesser consumed rows from the other 
server).
-        _logger.warn("Found {} primary keys not replaced when replacing 
segment: {} for partial-upsert table. This "
-            + "can potentially cause inconsistency between replicas", 
numKeysNotReplaced, segmentName);
-        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
-            numKeysNotReplaced);
+      checkForInconsistencies(segment, validDocIds, queryableDocIds, 
oldSegment, validDocIdsForOldSegment, segmentName);
+      removeSegment(oldSegment, validDocIdsForOldSegment);
+    }
+  }
+
+  void checkForInconsistencies(ImmutableSegment segment, 
ThreadSafeMutableRoaringBitmap validDocIds,

Review Comment:
   This method should have protected visibility instead of package-private to 
maintain consistency with the other protected methods in this class hierarchy.
   ```suggestion
     protected void checkForInconsistencies(ImmutableSegment segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -659,37 +665,94 @@ public void replaceSegment(ImmutableSegment segment, 
@Nullable ThreadSafeMutable
       validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
     }
     if (validDocIdsForOldSegment != null && 
!validDocIdsForOldSegment.isEmpty()) {
-      int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
-      // Add the new metric tracking here
-      if (_context.isDropOutOfOrderRecord() && _context.getConsistencyMode() 
== UpsertConfig.ConsistencyMode.NONE) {
-        // For Upsert tables when some of the records get dropped when 
dropOutOfOrderRecord is enabled, we donot
-        // store the original record location when keys are not replaced, this 
can potentially cause inconsistencies
-        // leading to some rows not getting dropped when reconsumed. This can 
be caused when a consuming segment
-        // that is consumed from a different server is replaced with the 
existing segment which consumed rows ahead
-        // of the other server
-        _logger.warn(
-            "Found {} primary keys not replaced when replacing segment: {} for 
upsert table with dropOutOfOrderRecord"
-                + " enabled with no consistency mode. This can potentially 
cause inconsistency between replicas",
-            numKeysNotReplaced, segmentName);
-        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
-            numKeysNotReplaced);
-      } else if (_partialUpsertHandler != null) {
-        // For partial-upsert table, because we do not restore the original 
record location when removing the primary
-        // keys not replaced, it can potentially cause inconsistency between 
replicas. This can happen when a
-        // consuming segment is replaced by a committed segment that is 
consumed from a different server with
-        // different records (some stream consumer cannot guarantee consuming 
the messages in the same order/
-        // when a segment is replaced with lesser consumed rows from the other 
server).
-        _logger.warn("Found {} primary keys not replaced when replacing 
segment: {} for partial-upsert table. This "
-            + "can potentially cause inconsistency between replicas", 
numKeysNotReplaced, segmentName);
-        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
-            numKeysNotReplaced);
+      checkForInconsistencies(segment, validDocIds, queryableDocIds, 
oldSegment, validDocIdsForOldSegment, segmentName);
+      removeSegment(oldSegment, validDocIdsForOldSegment);
+    }
+  }
+
+  void checkForInconsistencies(ImmutableSegment segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+      ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment,
+      MutableRoaringBitmap validDocIdsForOldSegment, String segmentName) {
+    int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
+    boolean isConsumingSegmentSeal = !(oldSegment instanceof ImmutableSegment);
+    // For partial-upsert table and upsert table with dropOutOfOrder=true & 
consistencyMode = NONE, we do not store
+    // the previous record location when removing the primary keys not 
replaced, it can potentially cause inconsistency
+    // between replicas. This can happen when a consuming segment is replaced 
by a committed segment that is consumed
+    // from a different server with different records (some stream consumer 
cannot guarantee consuming the messages in
+    // the same order/ when a segment is replaced with lesser consumed rows 
from the other server).
+    if (isConsumingSegmentSeal && _context.isDropOutOfOrderRecord()
+        && _context.getConsistencyMode() == UpsertConfig.ConsistencyMode.NONE) 
{
+      _logger.warn("Found {} primary keys not replaced when sealing consuming 
segment: {} for upsert table with "
+              + "dropOutOfOrderRecord enabled with no consistency mode. This 
can potentially cause inconsistency "
+              + "between replicas. Reverting back metadata changes and 
triggering segment replacement.",
+          numKeysNotReplaced,
+          segmentName);
+      revertSegmentUpsertMetadataWithRetry(segment, validDocIds, 
queryableDocIds, oldSegment, segmentName);
+    } else if (isConsumingSegmentSeal && _partialUpsertHandler != null) {
+      _logger.warn("Found {} primary keys not replaced when sealing consuming 
segment: {} for partial-upsert table. "
+          + "This can potentially cause inconsistency between replicas. "
+          + "Reverting metadata changes and triggering segment replacement.", 
numKeysNotReplaced, segmentName);
+      revertSegmentUpsertMetadataWithRetry(segment, validDocIds, 
queryableDocIds, oldSegment, segmentName);
+    } else {
+      _logger.warn("Found {} primary keys not replaced for the segment: {}.", 
numKeysNotReplaced, segmentName);
+    }
+  }
+
+  /**
+   * Reverts segment upsert metadata and retries addOrReplaceSegment with a 
maximum retry limit to prevent infinite
+   * recursion in case of persistent inconsistencies.
+   */
+  void revertSegmentUpsertMetadataWithRetry(ImmutableSegment segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+      ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment, 
String segmentName) {

Review Comment:
   This method should have protected visibility instead of package-private to 
maintain consistency with the other protected methods in this class hierarchy. 
This would better follow encapsulation principles for extensibility.
   ```suggestion
     protected void revertSegmentUpsertMetadataWithRetry(ImmutableSegment 
segment,
         ThreadSafeMutableRoaringBitmap validDocIds, 
ThreadSafeMutableRoaringBitmap queryableDocIds,
         IndexSegment oldSegment, String segmentName) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to