Copilot commented on code in PR #17324:
URL: https://github.com/apache/pinot/pull/17324#discussion_r2651134937


##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -784,6 +814,17 @@ private List<RecordInfo> getRecordInfoListForTTL(int 
numRecords, int[] primaryKe
     return recordInfoList;
   }
 
+  // Helper method for new reversion tests that need Integer comparison values
+  private List<RecordInfo> getRecordInfoListWithIntegerComparison(int 
numRecords, int[] primaryKeys, int[] timestamps,

Review Comment:
   The method name `getRecordInfoListWithIntegerComparison` is inconsistent 
with the existing naming pattern. The existing method `getRecordInfoListForTTL` 
uses a different naming convention. Consider renaming to `getRecordInfoList` 
(overloaded) or `getRecordInfoListWithIntComparison` for consistency with the 
codebase's naming style.
   ```suggestion
     private List<RecordInfo> getRecordInfoListWithIntComparison(int 
numRecords, int[] primaryKeys, int[] timestamps,
   ```



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -1724,4 +1901,411 @@ public int hashCode() {
       return _value;
     }
   }
+
+  // Tests for upsert metadata reversion functionality
+  @Test
+  public void testPartialUpsertSameDocsReplacement() throws IOException {
+    // Test partial upserts with old and new segments having same number of 
docs
+    // This test verifies that when all keys are present, no reversion occurs
+    PartialUpsertHandler mockPartialUpsertHandler = 
mock(PartialUpsertHandler.class);
+    UpsertContext upsertContext = 
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+        .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    int numRecords1 = 3;
+    int[] primaryKeys1 = new int[]{1, 2, 3};
+    int[] timestamps1 = new int[]{100, 200, 300};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords1; i++) {
+      validDocIds1.add(i);
+    }
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1, 
validDocIds1, null,
+        primaryKeysList1, timestamps1);
+    List<RecordInfo> recordInfoList1 = 
getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
+        timestamps1, null);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 1, segment1, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 1, 200, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, segment1, 2, 300, 
HashFunction.NONE);
+
+    // Create new segment with same 3 records but updated timestamps
+    int numRecords2 = 3;
+    int[] primaryKeys2 = new int[]{1, 2, 3};
+    int[] timestamps2 = new int[]{150, 250, 350};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords2; i++) {
+      validDocIds2.add(i);
+    }
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
+        primaryKeysList2, timestamps2);
+    List<RecordInfo> recordInfoList2 = 
getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2,
+        timestamps2, null);
+
+    // Replace segment - should trigger reversion logic but no reversion 
needed since all keys are present
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+
+    // Verify replacement - all records should be in new segment
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 1, segment2, 0, 150, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 250, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, segment2, 2, 350, 
HashFunction.NONE);
+
+    // New segment should have all docs valid
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 3);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testPartialUpsertOldSegmentTriggerReversion() throws IOException 
{
+    // Test partial upserts with consuming (mutable) segment being sealed - 
revert should be triggered
+    // Note: Revert logic only applies when sealing a consuming segment, not 
for immutable segment replacement
+    PartialUpsertHandler mockPartialUpsertHandler = 
mock(PartialUpsertHandler.class);
+    UpsertContext upsertContext = 
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+        .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    // Create a mutable (consuming) segment with 4 records - use 
mockMutableSegmentWithDataSource
+    // to support removeSegment which needs to read primary keys
+    int[] mutablePrimaryKeys = new int[]{1, 2, 3, 4};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1, 
validDocIds1, null, mutablePrimaryKeys);
+
+    // Add records to the mutable segment
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(1), 0, new Integer(100), false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(2), 1, new Integer(200), false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(3), 2, new Integer(300), false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(4), 3, new Integer(400), false));

Review Comment:
   Avoid using the deprecated `new Integer()` constructor. Use 
`Integer.valueOf()` or autoboxing (just pass the int value directly) instead. 
The `new Integer()` constructor has been deprecated since Java 9.
   ```suggestion
       upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(1), 0, 100, false));
       upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(2), 1, 200, false));
       upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(3), 2, 300, false));
       upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(4), 3, 400, false));
   ```



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -784,6 +814,17 @@ private List<RecordInfo> getRecordInfoListForTTL(int 
numRecords, int[] primaryKe
     return recordInfoList;
   }
 
+  // Helper method for new reversion tests that need Integer comparison values
+  private List<RecordInfo> getRecordInfoListWithIntegerComparison(int 
numRecords, int[] primaryKeys, int[] timestamps,
+      @Nullable boolean[] deleteRecordFlags) {
+    List<RecordInfo> recordInfoList = new ArrayList<>();
+    for (int i = 0; i < numRecords; i++) {
+      recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, 
Integer.valueOf(timestamps[i]),

Review Comment:
   Avoid using `Integer.valueOf()` to wrap primitive int values. The comparison 
value can be passed directly as `timestamps[i]` since autoboxing will handle 
the conversion. This reduces unnecessary object creation and improves 
readability.
   ```suggestion
         recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, 
timestamps[i],
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -620,6 +624,8 @@ protected void doReplaceSegment(ImmutableSegment segment, 
IndexSegment oldSegmen
         System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
 
+  private static final int MAX_UPSERT_REVERT_RETRIES = 3;

Review Comment:
   The retry limit is hardcoded to 3. Consider making this configurable through 
UpsertConfig to allow operators to adjust the retry behavior based on their 
specific consistency requirements and performance constraints.
   ```suggestion
     private static final String MAX_UPSERT_REVERT_RETRIES_SYS_PROP = 
"pinot.upsert.maxRevertRetries";
     private static final int DEFAULT_MAX_UPSERT_REVERT_RETRIES = 3;
     private static final int MAX_UPSERT_REVERT_RETRIES =
         Integer.getInteger(MAX_UPSERT_REVERT_RETRIES_SYS_PROP, 
DEFAULT_MAX_UPSERT_REVERT_RETRIES);
   ```



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -1724,4 +1901,411 @@ public int hashCode() {
       return _value;
     }
   }
+
+  // Tests for upsert metadata reversion functionality
+  @Test
+  public void testPartialUpsertSameDocsReplacement() throws IOException {
+    // Test partial upserts with old and new segments having same number of 
docs
+    // This test verifies that when all keys are present, no reversion occurs
+    PartialUpsertHandler mockPartialUpsertHandler = 
mock(PartialUpsertHandler.class);
+    UpsertContext upsertContext = 
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+        .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    int numRecords1 = 3;
+    int[] primaryKeys1 = new int[]{1, 2, 3};
+    int[] timestamps1 = new int[]{100, 200, 300};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords1; i++) {
+      validDocIds1.add(i);
+    }
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1, 
validDocIds1, null,
+        primaryKeysList1, timestamps1);
+    List<RecordInfo> recordInfoList1 = 
getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
+        timestamps1, null);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 1, segment1, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 1, 200, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, segment1, 2, 300, 
HashFunction.NONE);
+
+    // Create new segment with same 3 records but updated timestamps
+    int numRecords2 = 3;
+    int[] primaryKeys2 = new int[]{1, 2, 3};
+    int[] timestamps2 = new int[]{150, 250, 350};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords2; i++) {
+      validDocIds2.add(i);
+    }
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
+        primaryKeysList2, timestamps2);
+    List<RecordInfo> recordInfoList2 = 
getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2,
+        timestamps2, null);
+
+    // Replace segment - should trigger reversion logic but no reversion 
needed since all keys are present
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+
+    // Verify replacement - all records should be in new segment
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 1, segment2, 0, 150, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 250, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, segment2, 2, 350, 
HashFunction.NONE);
+
+    // New segment should have all docs valid
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 3);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testPartialUpsertOldSegmentTriggerReversion() throws IOException 
{
+    // Test partial upserts with consuming (mutable) segment being sealed - 
revert should be triggered
+    // Note: Revert logic only applies when sealing a consuming segment, not 
for immutable segment replacement
+    PartialUpsertHandler mockPartialUpsertHandler = 
mock(PartialUpsertHandler.class);
+    UpsertContext upsertContext = 
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+        .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    // Create a mutable (consuming) segment with 4 records - use 
mockMutableSegmentWithDataSource
+    // to support removeSegment which needs to read primary keys
+    int[] mutablePrimaryKeys = new int[]{1, 2, 3, 4};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1, 
validDocIds1, null, mutablePrimaryKeys);
+
+    // Add records to the mutable segment
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(1), 0, new Integer(100), false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(2), 1, new Integer(200), false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(3), 2, new Integer(300), false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(4), 3, new Integer(400), false));
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 4);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().getCardinality(), 4);
+
+    int numRecords2 = 2;
+    int[] primaryKeys2 = new int[]{1, 3};
+    int[] timestamps2 = new int[]{150, 350};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
+        primaryKeysList2, timestamps2);
+
+    // Replace mutable with immutable (consuming segment seal) - revert SHOULD 
be triggered
+    upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
+        getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2, 
timestamps2, null).iterator(),
+        mutableSegment);
+
+    assertEquals(recordLocationMap.size(), 2);
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 2);
+    checkRecordLocation(recordLocationMap, 1, segment2, 0, 150, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, segment2, 1, 350, 
HashFunction.NONE);
+
+    // Mutable segment's validDocIds should be 0 after removal
+    assertEquals(validDocIds1.getMutableRoaringBitmap().getCardinality(), 0);
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testPartialUpsertOldSegmentLesserDocs() throws IOException {
+    // Test partial upserts with old segment having fewer docs than new segment
+    PartialUpsertHandler mockPartialUpsertHandler = 
mock(PartialUpsertHandler.class);
+    UpsertContext upsertContext = 
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+        .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    int numRecords1 = 2;
+    int[] primaryKeys1 = new int[]{1, 2};
+    int[] timestamps1 = new int[]{100, 200};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords1; i++) {
+      validDocIds1.add(i);
+    }
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithTimestamps(1, validDocIds1, null, 
primaryKeysList1, timestamps1);
+    List<RecordInfo> recordInfoList1 =
+        getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1, 
timestamps1, null);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 2);
+
+    int numRecords2 = 4;
+    int[] primaryKeys2 = new int[]{1, 2, 3, 4};
+    int[] timestamps2 = new int[]{150, 250, 300, 400};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords2; i++) {
+      validDocIds2.add(i);
+    }
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
+        primaryKeysList2, timestamps2);
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+
+    // Verify state after replacement - all records should be in new segment
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 1, segment2, 0, 150, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 250, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, segment2, 2, 300, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 4, segment2, 3, 400, 
HashFunction.NONE);
+
+    // New segment should have all docs valid
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 4);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testFullUpsertConsistencyNoneSameDocs() throws IOException {
+    // Test full upserts with consistency=NONE and same number of docs
+    UpsertContext upsertContext = _contextBuilder
+        .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE)
+        .setDropOutOfOrderRecord(true)
+        .build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    // Create old segment with 3 records
+    int numRecords1 = 3;
+    int[] primaryKeys1 = new int[]{10, 20, 30};
+    int[] timestamps1 = new int[]{1000, 2000, 3000};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords1; i++) {
+      validDocIds1.add(i);
+    }
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1, 
validDocIds1, null,
+        primaryKeysList1, timestamps1);
+    List<RecordInfo> recordInfoList1 = 
getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
+        timestamps1, null);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+
+    int numRecords2 = 3;
+    int[] primaryKeys2 = new int[]{10, 20, 30};
+    int[] timestamps2 = new int[]{1500, 2500, 3500};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords2; i++) {
+      validDocIds2.add(i);
+    }
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
+        primaryKeysList2, timestamps2);
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 20, segment2, 1, 2500, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 30, segment2, 2, 3500, 
HashFunction.NONE);
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 3);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testFullUpsertConsistencyNoneOldSegmentMoreDocs()
+      throws Exception {
+    // Test full upserts with consistency=NONE where old segment has more docs
+    // Using real segments instead of mocks to avoid complex data source 
mocking
+    UpsertContext upsertContext =
+        
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    String segmentName = "test_segment";
+
+    // Create first real segment with 3 records
+    int[] primaryKeys1 = new int[]{10, 30, 40};
+    int[] timestamps1 = new int[]{1500, 3500, 4000};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment1 = createRealSegment(segmentName, 
primaryKeys1, timestamps1, validDocIds1);
+
+    upsertMetadataManager.addSegment(segment1);
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+
+    // Create second real segment with 2 records (subset of first)
+    int[] primaryKeys2 = new int[]{10, 30};
+    int[] timestamps2 = new int[]{1500, 3500};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment2 = createRealSegment(segmentName, 
primaryKeys2, timestamps2, validDocIds2);
+
+    // Replace segment - RecordInfoReader will read real data from segment2
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+
+    assertEquals(recordLocationMap.size(), 2);
+    checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 30, segment2, 1, 3500, 
HashFunction.NONE);
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 2);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+
+    // Clean up real segments
+    segment1.destroy();
+    segment2.destroy();
+  }
+
+  @Test
+  public void testFullUpsertRegularConsistencyMode()
+      throws IOException {
+    // Test full upserts with regular consistency mode (not NONE) - no 
reversion should occur
+    UpsertContext upsertContext = 
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.SNAPSHOT).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    int numRecords1 = 3;
+    int[] primaryKeys1 = new int[]{100, 200, 300};
+    int[] timestamps1 = new int[]{10000, 20000, 30000};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords1; i++) {
+      validDocIds1.add(i);
+    }
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithTimestamps(1, validDocIds1, null, 
primaryKeysList1, timestamps1);
+    List<RecordInfo> recordInfoList1 =
+        getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1, 
timestamps1, null);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+    int numRecords2 = 1;
+    int[] primaryKeys2 = new int[]{100};
+    int[] timestamps2 = new int[]{15000};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords2; i++) {
+      validDocIds2.add(i);
+    }
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 =
+        mockImmutableSegmentWithTimestamps(1, validDocIds2, null, 
primaryKeysList2, timestamps2);
+
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+
+    assertEquals(recordLocationMap.size(), 1);
+    checkRecordLocation(recordLocationMap, 100, segment2, 0, 15000, 
HashFunction.NONE);
+
+    
assertEquals(segment1.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 2);
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 1);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testRevertOnlyAppliesForConsumingSegmentSeal()
+      throws IOException {
+    UpsertContext upsertContext =
+        
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    int[] mutablePrimaryKeys = new int[]{10, 20, 30};
+    ThreadSafeMutableRoaringBitmap validDocIdsMutable = new 
ThreadSafeMutableRoaringBitmap();
+    MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1, 
validDocIdsMutable, null, mutablePrimaryKeys);
+
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(10), 0, new Integer(1000), false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(20), 1, new Integer(2000), false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(30), 2, new Integer(3000), false));

Review Comment:
   Avoid using the deprecated `new Integer()` constructor. Use 
`Integer.valueOf()` or autoboxing instead. The `new Integer()` constructor has 
been deprecated since Java 9.
   ```suggestion
       upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(10), 0, Integer.valueOf(1000), false));
       upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(20), 1, Integer.valueOf(2000), false));
       upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(30), 2, Integer.valueOf(3000), false));
   ```



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -808,26 +849,127 @@ private List<PrimaryKey> getPrimaryKeyList(int 
numRecords, int[] primaryKeys) {
   private static ImmutableSegmentImpl mockImmutableSegment(int sequenceNumber,
       ThreadSafeMutableRoaringBitmap validDocIds, @Nullable 
ThreadSafeMutableRoaringBitmap queryableDocIds,
       List<PrimaryKey> primaryKeys) {
+    return mockImmutableSegmentWithTimestamps(sequenceNumber, validDocIds, 
queryableDocIds, primaryKeys, null);
+  }
+
+  private static ImmutableSegmentImpl mockImmutableSegmentWithTimestamps(int 
sequenceNumber,
+      ThreadSafeMutableRoaringBitmap validDocIds, @Nullable 
ThreadSafeMutableRoaringBitmap queryableDocIds,
+      List<PrimaryKey> primaryKeys, @Nullable int[] timestamps) {
     ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
     when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
     when(segment.getValidDocIds()).thenReturn(validDocIds);
     when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
-    DataSource dataSource = mock(DataSource.class);
-    when(segment.getDataSource(anyString())).thenReturn(dataSource);
-    ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
-    when(forwardIndex.isSingleValue()).thenReturn(true);
-    when(forwardIndex.getStoredType()).thenReturn(DataType.INT);
-    when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
-        invocation -> 
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
-    when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+
+    // Enhanced mocking for RecordInfoReader to work properly
+    // Mock primary key column data source
+    DataSource primaryKeyDataSource = mock(DataSource.class);
+    ForwardIndexReader primaryKeyForwardIndex = mock(ForwardIndexReader.class);
+    when(primaryKeyForwardIndex.isSingleValue()).thenReturn(true);
+    when(primaryKeyForwardIndex.getStoredType()).thenReturn(DataType.INT);
+    when(primaryKeyForwardIndex.createContext()).thenReturn(null);
+    when(primaryKeyForwardIndex.getInt(anyInt(), any())).thenAnswer(invocation 
-> {
+      int docId = invocation.getArgument(0);
+      if (primaryKeys != null && docId < primaryKeys.size()) {
+        return (Integer) primaryKeys.get(docId).getValues()[0];
+      }
+      return 1000 + docId; // Default fallback
+    });
+    
when(primaryKeyDataSource.getForwardIndex()).thenReturn(primaryKeyForwardIndex);
+
+    // Mock comparison column data source
+    DataSource comparisonDataSource = mock(DataSource.class);
+    ForwardIndexReader comparisonForwardIndex = mock(ForwardIndexReader.class);
+    when(comparisonForwardIndex.isSingleValue()).thenReturn(true);
+    when(comparisonForwardIndex.getStoredType()).thenReturn(DataType.INT);
+    when(comparisonForwardIndex.createContext()).thenReturn(null);
+    when(comparisonForwardIndex.getInt(anyInt(), any())).thenAnswer(invocation 
-> {
+      int docId = invocation.getArgument(0);
+      // Return actual timestamp values if provided, otherwise default values
+      if (timestamps != null && docId < timestamps.length) {
+        return timestamps[docId];
+      }
+      return 1000 + (docId * 100); // Default fallback

Review Comment:
   The magic numbers `1000` and `100` are used as fallback values without 
explanation. Consider extracting these to named constants like 
`DEFAULT_TIMESTAMP_BASE` and `DEFAULT_TIMESTAMP_STEP` to improve code 
maintainability and document the intent.



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -808,26 +849,127 @@ private List<PrimaryKey> getPrimaryKeyList(int 
numRecords, int[] primaryKeys) {
   private static ImmutableSegmentImpl mockImmutableSegment(int sequenceNumber,
       ThreadSafeMutableRoaringBitmap validDocIds, @Nullable 
ThreadSafeMutableRoaringBitmap queryableDocIds,
       List<PrimaryKey> primaryKeys) {
+    return mockImmutableSegmentWithTimestamps(sequenceNumber, validDocIds, 
queryableDocIds, primaryKeys, null);
+  }
+
+  private static ImmutableSegmentImpl mockImmutableSegmentWithTimestamps(int 
sequenceNumber,
+      ThreadSafeMutableRoaringBitmap validDocIds, @Nullable 
ThreadSafeMutableRoaringBitmap queryableDocIds,
+      List<PrimaryKey> primaryKeys, @Nullable int[] timestamps) {
     ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
     when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
     when(segment.getValidDocIds()).thenReturn(validDocIds);
     when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
-    DataSource dataSource = mock(DataSource.class);
-    when(segment.getDataSource(anyString())).thenReturn(dataSource);
-    ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
-    when(forwardIndex.isSingleValue()).thenReturn(true);
-    when(forwardIndex.getStoredType()).thenReturn(DataType.INT);
-    when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
-        invocation -> 
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
-    when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+
+    // Enhanced mocking for RecordInfoReader to work properly
+    // Mock primary key column data source
+    DataSource primaryKeyDataSource = mock(DataSource.class);
+    ForwardIndexReader primaryKeyForwardIndex = mock(ForwardIndexReader.class);
+    when(primaryKeyForwardIndex.isSingleValue()).thenReturn(true);
+    when(primaryKeyForwardIndex.getStoredType()).thenReturn(DataType.INT);
+    when(primaryKeyForwardIndex.createContext()).thenReturn(null);
+    when(primaryKeyForwardIndex.getInt(anyInt(), any())).thenAnswer(invocation 
-> {
+      int docId = invocation.getArgument(0);
+      if (primaryKeys != null && docId < primaryKeys.size()) {
+        return (Integer) primaryKeys.get(docId).getValues()[0];
+      }
+      return 1000 + docId; // Default fallback

Review Comment:
   The magic number `1000` is used as a fallback value without explanation. 
Consider extracting this to a named constant like `DEFAULT_PRIMARY_KEY_BASE` or 
`FALLBACK_PRIMARY_KEY_OFFSET` to improve code maintainability and make the 
intent clearer.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -659,37 +665,107 @@ public void replaceSegment(ImmutableSegment segment, 
@Nullable ThreadSafeMutable
       validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
     }
     if (validDocIdsForOldSegment != null && 
!validDocIdsForOldSegment.isEmpty()) {
-      int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
-      // Add the new metric tracking here
-      if (_context.isDropOutOfOrderRecord() && _context.getConsistencyMode() 
== UpsertConfig.ConsistencyMode.NONE) {
-        // For Upsert tables when some of the records get dropped when 
dropOutOfOrderRecord is enabled, we donot
-        // store the original record location when keys are not replaced, this 
can potentially cause inconsistencies
-        // leading to some rows not getting dropped when reconsumed. This can 
be caused when a consuming segment
-        // that is consumed from a different server is replaced with the 
existing segment which consumed rows ahead
-        // of the other server
-        _logger.warn(
-            "Found {} primary keys not replaced when replacing segment: {} for 
upsert table with dropOutOfOrderRecord"
-                + " enabled with no consistency mode. This can potentially 
cause inconsistency between replicas",
-            numKeysNotReplaced, segmentName);
-        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
-            numKeysNotReplaced);
-      } else if (_partialUpsertHandler != null) {
-        // For partial-upsert table, because we do not restore the original 
record location when removing the primary
-        // keys not replaced, it can potentially cause inconsistency between 
replicas. This can happen when a
-        // consuming segment is replaced by a committed segment that is 
consumed from a different server with
-        // different records (some stream consumer cannot guarantee consuming 
the messages in the same order/
-        // when a segment is replaced with lesser consumed rows from the other 
server).
-        _logger.warn("Found {} primary keys not replaced when replacing 
segment: {} for partial-upsert table. This "
-            + "can potentially cause inconsistency between replicas", 
numKeysNotReplaced, segmentName);
-        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
-            numKeysNotReplaced);
+      checkForInconsistencies(segment, validDocIds, queryableDocIds, 
oldSegment, validDocIdsForOldSegment, segmentName);
+      removeSegment(oldSegment, validDocIdsForOldSegment);
+    }
+  }
+
+  void checkForInconsistencies(ImmutableSegment segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+      ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment,
+      MutableRoaringBitmap validDocIdsForOldSegment, String segmentName) {
+    int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
+    // Only apply revert logic when sealing a consuming (mutable) segment, not 
for immutable segment replacement.
+    // Consuming segments are mutable, so we check if the old segment is NOT 
an ImmutableSegment.
+    boolean isConsumingSegmentSeal = !(oldSegment instanceof ImmutableSegment);
+    if (isConsumingSegmentSeal && _context.isDropOutOfOrderRecord()
+        && _context.getConsistencyMode() == UpsertConfig.ConsistencyMode.NONE) 
{
+      // For Upsert tables when some of the records get dropped when 
dropOutOfOrderRecord is enabled, we donot
+      // store the original record location when keys are not replaced, this 
can potentially cause inconsistencies
+      // leading to some rows not getting dropped when reconsumed. This can be 
caused when a consuming segment
+      // that is consumed from a different server is replaced with the 
existing segment which consumed rows ahead
+      // of the other server
+      _logger.warn("Found {} primary keys not replaced when sealing consuming 
segment: {} for upsert table with "
+              + "dropOutOfOrderRecord enabled with no consistency mode. This 
can potentially cause inconsistency "
+              + "between replicas. Reverting back metadata changes and 
triggering segment replacement.",
+          numKeysNotReplaced, segmentName);
+      // Revert consuming segment pks to previous segment locations and 
perform metadata replacement again with retry
+      revertSegmentUpsertMetadataWithRetry(segment, validDocIds, 
queryableDocIds, oldSegment, segmentName);
+    } else if (isConsumingSegmentSeal && _partialUpsertHandler != null) {
+      // For partial-upsert table, because we do not restore the original 
record location when removing the primary
+      // keys not replaced, it can potentially cause inconsistency between 
replicas. This can happen when a
+      // consuming segment is replaced by a committed segment that is consumed 
from a different server with
+      // different records (some stream consumer cannot guarantee consuming 
the messages in the same order/
+      // when a segment is replaced with lesser consumed rows from the other 
server).
+      _logger.warn("Found {} primary keys not replaced when sealing consuming 
segment: {} for partial-upsert table. "
+          + "This can potentially cause inconsistency between replicas. "
+          + "Reverting metadata changes and triggering segment replacement.", 
numKeysNotReplaced, segmentName);
+      // Revert consuming segment pks to previous segment locations and 
perform metadata replacement again with retry
+      revertSegmentUpsertMetadataWithRetry(segment, validDocIds, 
queryableDocIds, oldSegment, segmentName);
+    } else if (isConsumingSegmentSeal) {
+      _logger.info("Found {} primary keys not replaced when sealing consuming 
segment: {}", numKeysNotReplaced,
+          segmentName);
+    } else {
+      // For immutable segment replacement (e.g., segment reload), just log 
the info without revert
+      _logger.warn("Found {} primary keys not replaced when replacing 
immutable segment: {}. "
+          + "Skipping revert as this is not a consuming segment.", 
numKeysNotReplaced, segmentName);
+    }
+  }
+
+  /**
+   * Reverts segment upsert metadata and retries addOrReplaceSegment with a 
maximum retry limit to prevent infinite
+   * recursion in case of persistent inconsistencies.
+   */
+  void revertSegmentUpsertMetadataWithRetry(ImmutableSegment segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+      ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment, 
String segmentName) {
+    for (int retryCount = 0; retryCount < MAX_UPSERT_REVERT_RETRIES; 
retryCount++) {
+      revertCurrentSegmentUpsertMetadata(oldSegment, validDocIds, 
queryableDocIds);
+      MutableRoaringBitmap validDocIdsForOldSegment = 
getValidDocIdsForOldSegment(oldSegment);
+      try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
+          _comparisonColumns, _deleteRecordColumn)) {
+        Iterator<RecordInfo> latestRecordInfoIterator =
+            UpsertUtils.getRecordInfoIterator(recordInfoReader, 
segment.getSegmentMetadata().getTotalDocs());
+        addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, 
queryableDocIds, latestRecordInfoIterator,
+            oldSegment, validDocIdsForOldSegment);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            String.format("Caught exception while replacing segment metadata 
during inconsistencies: %s, table: %s",
+                segmentName, _tableNameWithType), e);
+      }
+
+      validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
+      if (validDocIdsForOldSegment == null || 
validDocIdsForOldSegment.isEmpty()) {
+        _logger.info("Successfully resolved inconsistency for segment: {} 
after {} retry attempt(s)", segmentName,
+            retryCount + 1);
+        return;
+      }
+
+      int numKeysStillNotReplaced = validDocIdsForOldSegment.getCardinality();
+      if (retryCount < MAX_UPSERT_REVERT_RETRIES - 1) {
+        _logger.warn("Retry {}/{}: Still found {} primary keys not replaced 
for segment: {}. Retrying...",
+            retryCount + 1, MAX_UPSERT_REVERT_RETRIES, 
numKeysStillNotReplaced, segmentName);
       } else {
-        _logger.info("Found {} primary keys not replaced when replacing 
segment: {}", numKeysNotReplaced, segmentName);
+        _logger.error("Exhausted all {} retries for segment: {}. Found {} 
primary keys still not replaced. "
+                + "Proceeding with current state which may cause 
inconsistency.", MAX_UPSERT_REVERT_RETRIES,
+            segmentName,
+            numKeysStillNotReplaced);

Review Comment:
   The log message spans multiple lines with inconsistent indentation. The 
third parameter `segmentName` should be aligned with the other parameters for 
better readability. Consider formatting as `_logger.error(\"Exhausted all {} 
retries for segment: {}. Found {} primary keys still not replaced. Proceeding 
with current state which may cause inconsistency.\", MAX_UPSERT_REVERT_RETRIES, 
segmentName, numKeysStillNotReplaced);`
   ```suggestion
                   + "Proceeding with current state which may cause 
inconsistency.", MAX_UPSERT_REVERT_RETRIES, segmentName,
               numKeysStillNotReplaced);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to