Copilot commented on code in PR #17324:
URL: https://github.com/apache/pinot/pull/17324#discussion_r2654806981
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -83,18 +95,38 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
private static final String DELETE_RECORD_COLUMN = "deleteCol";
private static final File INDEX_DIR =
new File(FileUtils.getTempDirectory(),
"ConcurrentMapPartitionUpsertMetadataManagerTest");
+ private static final File SEGMENT_DIR = new File(INDEX_DIR, "segments");
+
+ private static final int MOCK_FALLBACK_BASE_OFFSET = 1000;
Review Comment:
The constant name `MOCK_FALLBACK_BASE_OFFSET` is ambiguous. Consider
renaming it to `MOCK_DEFAULT_PRIMARY_KEY_BASE` or
`MOCK_FALLBACK_PRIMARY_KEY_OFFSET` to clarify that it's used as a fallback for
generating primary key values in mock scenarios.
```suggestion
private static final int MOCK_FALLBACK_PRIMARY_KEY_OFFSET = 1000;
```
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -808,26 +851,127 @@ private List<PrimaryKey> getPrimaryKeyList(int
numRecords, int[] primaryKeys) {
private static ImmutableSegmentImpl mockImmutableSegment(int sequenceNumber,
ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
List<PrimaryKey> primaryKeys) {
+ return mockImmutableSegmentWithTimestamps(sequenceNumber, validDocIds,
queryableDocIds, primaryKeys, null);
+ }
+
+ private static ImmutableSegmentImpl mockImmutableSegmentWithTimestamps(int
sequenceNumber,
+ ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
+ List<PrimaryKey> primaryKeys, @Nullable int[] timestamps) {
ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
when(segment.getValidDocIds()).thenReturn(validDocIds);
when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
- DataSource dataSource = mock(DataSource.class);
- when(segment.getDataSource(anyString())).thenReturn(dataSource);
- ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
- when(forwardIndex.isSingleValue()).thenReturn(true);
- when(forwardIndex.getStoredType()).thenReturn(DataType.INT);
- when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
- invocation ->
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
- when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+
+ // Enhanced mocking for RecordInfoReader to work properly
+ // Mock primary key column data source
+ DataSource primaryKeyDataSource = mock(DataSource.class);
+ ForwardIndexReader primaryKeyForwardIndex = mock(ForwardIndexReader.class);
+ when(primaryKeyForwardIndex.isSingleValue()).thenReturn(true);
+ when(primaryKeyForwardIndex.getStoredType()).thenReturn(DataType.INT);
+ when(primaryKeyForwardIndex.createContext()).thenReturn(null);
+ when(primaryKeyForwardIndex.getInt(anyInt(), any())).thenAnswer(invocation
-> {
+ int docId = invocation.getArgument(0);
+ if (primaryKeys != null && docId < primaryKeys.size()) {
+ return (Integer) primaryKeys.get(docId).getValues()[0];
+ }
+ return MOCK_FALLBACK_BASE_OFFSET + docId;
+ });
+
when(primaryKeyDataSource.getForwardIndex()).thenReturn(primaryKeyForwardIndex);
+
+ // Mock comparison column data source
+ DataSource comparisonDataSource = mock(DataSource.class);
+ ForwardIndexReader comparisonForwardIndex = mock(ForwardIndexReader.class);
+ when(comparisonForwardIndex.isSingleValue()).thenReturn(true);
+ when(comparisonForwardIndex.getStoredType()).thenReturn(DataType.INT);
+ when(comparisonForwardIndex.createContext()).thenReturn(null);
+ when(comparisonForwardIndex.getInt(anyInt(), any())).thenAnswer(invocation
-> {
+ int docId = invocation.getArgument(0);
+ // Return actual timestamp values if provided, otherwise default values
+ if (timestamps != null && docId < timestamps.length) {
+ return timestamps[docId];
+ }
+ return MOCK_FALLBACK_BASE_OFFSET + (docId * 100);
+ });
+
when(comparisonDataSource.getForwardIndex()).thenReturn(comparisonForwardIndex);
+
+ // Set up data source mapping - IMPORTANT: anyString() must be registered
FIRST,
+ // then specific matchers override it (Mockito uses last matching stub)
Review Comment:
This comment incorrectly states 'Mockito uses last matching stub'. Mockito
actually uses the first matching stub in order of registration. The code
behavior is correct (anyString() fallback, then specific overrides), but the
explanation should be clarified to say 'Mockito uses the most specific matching
stub' or 'specific matchers take precedence'.
```suggestion
// Set up data source mapping - use a generic anyString() fallback,
// then more specific matchers override it (Mockito uses the most
specific matching stub)
```
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -1724,4 +1903,414 @@ public int hashCode() {
return _value;
}
}
+
+ // Tests for upsert metadata reversion functionality
+ @Test
+ public void testPartialUpsertSameDocsReplacement() throws IOException {
+ // Test partial upserts with old and new segments having same number of
docs
+ // This test verifies that when all keys are present, no reversion occurs
+ PartialUpsertHandler mockPartialUpsertHandler =
mock(PartialUpsertHandler.class);
+ UpsertContext upsertContext =
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+ .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ int numRecords1 = 3;
+ int[] primaryKeys1 = new int[]{1, 2, 3};
+ int[] timestamps1 = new int[]{100, 200, 300};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords1; i++) {
+ validDocIds1.add(i);
+ }
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1,
validDocIds1, null,
+ primaryKeysList1, timestamps1);
+ List<RecordInfo> recordInfoList1 =
getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
+ timestamps1, null);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 1, segment1, 0, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 1, 200,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment1, 2, 300,
HashFunction.NONE);
+
+ // Create new segment with same 3 records but updated timestamps
+ int numRecords2 = 3;
+ int[] primaryKeys2 = new int[]{1, 2, 3};
+ int[] timestamps2 = new int[]{150, 250, 350};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords2; i++) {
+ validDocIds2.add(i);
+ }
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
+ primaryKeysList2, timestamps2);
+ List<RecordInfo> recordInfoList2 =
getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2,
+ timestamps2, null);
+
+ // Replace segment - should trigger reversion logic but no reversion
needed since all keys are present
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+
+ // Verify replacement - all records should be in new segment
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 1, segment2, 0, 150,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 250,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment2, 2, 350,
HashFunction.NONE);
+
+ // New segment should have all docs valid
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
3);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testPartialUpsertOldSegmentTriggerReversion() throws IOException
{
+ // Test partial upserts with consuming (mutable) segment being sealed -
revert should be triggered
+ // Note: Revert logic only applies when sealing a consuming segment, not
for immutable segment replacement
+ PartialUpsertHandler mockPartialUpsertHandler =
mock(PartialUpsertHandler.class);
+ UpsertContext upsertContext =
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+ .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ // Create a mutable (consuming) segment with 4 records - use
mockMutableSegmentWithDataSource
+ // to support removeSegment which needs to read primary keys
+ int[] mutablePrimaryKeys = new int[]{1, 2, 3, 4};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1,
validDocIds1, null, mutablePrimaryKeys);
+
+ // Add records to the mutable segment
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(1), 0, 100, false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(2), 1, 200, false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(3), 2, 300, false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(4), 3, 400, false));
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 4);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().getCardinality(), 4);
+
+ int numRecords2 = 2;
+ int[] primaryKeys2 = new int[]{1, 3};
+ int[] timestamps2 = new int[]{150, 350};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
+ primaryKeysList2, timestamps2);
+
+ // Replace mutable with immutable (consuming segment seal) - revert SHOULD
be triggered
+ upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
+ getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2,
timestamps2, null).iterator(),
+ mutableSegment);
+
+ assertEquals(recordLocationMap.size(), 2);
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
2);
+ checkRecordLocation(recordLocationMap, 1, segment2, 0, 150,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment2, 1, 350,
HashFunction.NONE);
+
+ // Mutable segment's validDocIds should be 0 after removal
+ assertEquals(validDocIds1.getMutableRoaringBitmap().getCardinality(), 0);
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testPartialUpsertOldSegmentLesserDocs() throws IOException {
+ // Test partial upserts with old segment having fewer docs than new segment
+ PartialUpsertHandler mockPartialUpsertHandler =
mock(PartialUpsertHandler.class);
+ UpsertContext upsertContext =
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+ .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ int numRecords1 = 2;
+ int[] primaryKeys1 = new int[]{1, 2};
+ int[] timestamps1 = new int[]{100, 200};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords1; i++) {
+ validDocIds1.add(i);
+ }
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithTimestamps(1, validDocIds1, null,
primaryKeysList1, timestamps1);
+ List<RecordInfo> recordInfoList1 =
+ getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
timestamps1, null);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 2);
+
+ int numRecords2 = 4;
+ int[] primaryKeys2 = new int[]{1, 2, 3, 4};
+ int[] timestamps2 = new int[]{150, 250, 300, 400};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords2; i++) {
+ validDocIds2.add(i);
+ }
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
+ primaryKeysList2, timestamps2);
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+
+ // Verify state after replacement - all records should be in new segment
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 1, segment2, 0, 150,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 250,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment2, 2, 300,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 4, segment2, 3, 400,
HashFunction.NONE);
+
+ // New segment should have all docs valid
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
4);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testFullUpsertConsistencyNoneSameDocs() throws IOException {
+ // Test full upserts with consistency=NONE and same number of docs
+ UpsertContext upsertContext = _contextBuilder
+ .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE)
+ .setDropOutOfOrderRecord(true)
+ .build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ // Create old segment with 3 records
+ int numRecords1 = 3;
+ int[] primaryKeys1 = new int[]{10, 20, 30};
+ int[] timestamps1 = new int[]{1000, 2000, 3000};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords1; i++) {
+ validDocIds1.add(i);
+ }
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1,
validDocIds1, null,
+ primaryKeysList1, timestamps1);
+ List<RecordInfo> recordInfoList1 =
getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
+ timestamps1, null);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+
+ int numRecords2 = 3;
+ int[] primaryKeys2 = new int[]{10, 20, 30};
+ int[] timestamps2 = new int[]{1500, 2500, 3500};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords2; i++) {
+ validDocIds2.add(i);
+ }
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
+ primaryKeysList2, timestamps2);
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 20, segment2, 1, 2500,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 30, segment2, 2, 3500,
HashFunction.NONE);
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
3);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testFullUpsertConsistencyNoneOldSegmentMoreDocs()
+ throws Exception {
+ // Test full upserts with consistency=NONE where old segment has more docs
+ // Using real segments instead of mocks to avoid complex data source
mocking
+ UpsertContext upsertContext =
+
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ String segmentName = "test_segment";
+
+ // Create first real segment with 3 records
+ int[] primaryKeys1 = new int[]{10, 30, 40};
+ int[] timestamps1 = new int[]{1500, 3500, 4000};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment1 = createRealSegment(segmentName,
primaryKeys1, timestamps1, validDocIds1);
+
+ upsertMetadataManager.addSegment(segment1);
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+
+ // Create second real segment with 2 records (subset of first)
+ int[] primaryKeys2 = new int[]{10, 30};
+ int[] timestamps2 = new int[]{1500, 3500};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment2 = createRealSegment(segmentName,
primaryKeys2, timestamps2, validDocIds2);
+
+ // Replace segment - RecordInfoReader will read real data from segment2
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+
+ assertEquals(recordLocationMap.size(), 2);
+ checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 30, segment2, 1, 3500,
HashFunction.NONE);
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
2);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+
+ // Clean up real segments
+ segment1.destroy();
+ segment2.destroy();
+ }
+
+ @Test
+ public void testFullUpsertRegularConsistencyMode()
+ throws IOException {
+ // Test full upserts with regular consistency mode (not NONE) - no
reversion should occur
+ UpsertContext upsertContext =
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.SNAPSHOT).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ int numRecords1 = 3;
+ int[] primaryKeys1 = new int[]{100, 200, 300};
+ int[] timestamps1 = new int[]{10000, 20000, 30000};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords1; i++) {
+ validDocIds1.add(i);
+ }
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithTimestamps(1, validDocIds1, null,
primaryKeysList1, timestamps1);
+ List<RecordInfo> recordInfoList1 =
+ getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
timestamps1, null);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+ int numRecords2 = 1;
+ int[] primaryKeys2 = new int[]{100};
+ int[] timestamps2 = new int[]{15000};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords2; i++) {
+ validDocIds2.add(i);
+ }
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 =
+ mockImmutableSegmentWithTimestamps(1, validDocIds2, null,
primaryKeysList2, timestamps2);
+
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+
+ assertEquals(recordLocationMap.size(), 1);
+ checkRecordLocation(recordLocationMap, 100, segment2, 0, 15000,
HashFunction.NONE);
+
+
assertEquals(segment1.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
2);
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
1);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testRevertOnlyAppliesForConsumingSegmentSeal()
+ throws IOException {
+ UpsertContext upsertContext =
+
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ int[] mutablePrimaryKeys = new int[]{10, 20, 30};
+ ThreadSafeMutableRoaringBitmap validDocIdsMutable = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1,
validDocIdsMutable, null, mutablePrimaryKeys);
+
+ upsertMetadataManager.addRecord(mutableSegment,
+ new RecordInfo(makePrimaryKey(10), 0, Integer.valueOf(1000), false));
+ upsertMetadataManager.addRecord(mutableSegment,
+ new RecordInfo(makePrimaryKey(20), 1, Integer.valueOf(2000), false));
+ upsertMetadataManager.addRecord(mutableSegment,
+ new RecordInfo(makePrimaryKey(30), 2, Integer.valueOf(3000), false));
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+
assertEquals(validDocIdsMutable.getMutableRoaringBitmap().getCardinality(), 3);
+
+ int numRecords = 2;
+ int[] primaryKeys = new int[]{10, 20};
+ int[] timestamps = new int[]{1500, 2500};
+ ThreadSafeMutableRoaringBitmap validDocIdsImmutable = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeysList = getPrimaryKeyList(numRecords,
primaryKeys);
+ ImmutableSegmentImpl immutableSegment =
mockImmutableSegmentWithTimestamps(1, validDocIdsImmutable, null,
+ primaryKeysList, timestamps);
+
+ // This should trigger the revert logic since old segment is mutable
+ upsertMetadataManager.replaceSegment(immutableSegment,
validDocIdsImmutable, null,
+ getRecordInfoListWithIntegerComparison(numRecords, primaryKeys,
timestamps, null).iterator(), mutableSegment);
+
+ // After replacement, the records from immutable segment should be present
+ assertEquals(recordLocationMap.size(), 2);
+ checkRecordLocation(recordLocationMap, 10, immutableSegment, 0, 1500,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 20, immutableSegment, 1, 2500,
HashFunction.NONE);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testNoRevertForImmutableSegmentReplacement()
+ throws IOException {
+ // Test that revert logic is NOT applied when replacing immutable segment
with another immutable segment
+ UpsertContext upsertContext =
+
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ // Create first immutable segment with 3 records
+ int numRecords1 = 3;
+ int[] primaryKeys1 = new int[]{10, 20, 30};
+ int[] timestamps1 = new int[]{1000, 2000, 3000};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords1; i++) {
+ validDocIds1.add(i);
+ }
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1,
validDocIds1, null,
+ primaryKeysList1, timestamps1);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+ getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
timestamps1, null).iterator());
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+
+ int numRecords2 = 1;
+ int[] primaryKeys2 = new int[]{10};
+ int[] timestamps2 = new int[]{1500};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords2; i++) {
+ validDocIds2.add(i);
+ }
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
+ primaryKeysList2, timestamps2);
+
+ long startTime = System.currentTimeMillis();
+ upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
+ getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2,
timestamps2, null).iterator(), segment1);
+ long duration = System.currentTimeMillis() - startTime;
+
+ assertTrue(duration < 1000, "Immutable-to-immutable replacement should
complete quickly, took: " + duration + "ms");
Review Comment:
The hard-coded 1000ms timeout threshold may cause flaky test failures in
slower CI environments or under load. Consider increasing this threshold or
using a more flexible approach such as comparing execution time between revert
and non-revert scenarios.
```suggestion
assertTrue(duration < 30_000,
"Immutable-to-immutable replacement should complete quickly (under
30000ms), took: " + duration + "ms");
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -659,37 +665,94 @@ public void replaceSegment(ImmutableSegment segment,
@Nullable ThreadSafeMutable
validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
}
if (validDocIdsForOldSegment != null &&
!validDocIdsForOldSegment.isEmpty()) {
- int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
- // Add the new metric tracking here
- if (_context.isDropOutOfOrderRecord() && _context.getConsistencyMode()
== UpsertConfig.ConsistencyMode.NONE) {
- // For Upsert tables when some of the records get dropped when
dropOutOfOrderRecord is enabled, we donot
- // store the original record location when keys are not replaced, this
can potentially cause inconsistencies
- // leading to some rows not getting dropped when reconsumed. This can
be caused when a consuming segment
- // that is consumed from a different server is replaced with the
existing segment which consumed rows ahead
- // of the other server
- _logger.warn(
- "Found {} primary keys not replaced when replacing segment: {} for
upsert table with dropOutOfOrderRecord"
- + " enabled with no consistency mode. This can potentially
cause inconsistency between replicas",
- numKeysNotReplaced, segmentName);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
- numKeysNotReplaced);
- } else if (_partialUpsertHandler != null) {
- // For partial-upsert table, because we do not restore the original
record location when removing the primary
- // keys not replaced, it can potentially cause inconsistency between
replicas. This can happen when a
- // consuming segment is replaced by a committed segment that is
consumed from a different server with
- // different records (some stream consumer cannot guarantee consuming
the messages in the same order/
- // when a segment is replaced with lesser consumed rows from the other
server).
- _logger.warn("Found {} primary keys not replaced when replacing
segment: {} for partial-upsert table. This "
- + "can potentially cause inconsistency between replicas",
numKeysNotReplaced, segmentName);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
- numKeysNotReplaced);
+ checkForInconsistencies(segment, validDocIds, queryableDocIds,
oldSegment, validDocIdsForOldSegment, segmentName);
+ removeSegment(oldSegment, validDocIdsForOldSegment);
+ }
+ }
+
+ void checkForInconsistencies(ImmutableSegment segment,
ThreadSafeMutableRoaringBitmap validDocIds,
Review Comment:
This method should have protected visibility instead of package-private to
maintain consistency with the other protected methods in this class hierarchy.
```suggestion
protected void checkForInconsistencies(ImmutableSegment segment,
ThreadSafeMutableRoaringBitmap validDocIds,
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -659,37 +665,94 @@ public void replaceSegment(ImmutableSegment segment,
@Nullable ThreadSafeMutable
validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
}
if (validDocIdsForOldSegment != null &&
!validDocIdsForOldSegment.isEmpty()) {
- int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
- // Add the new metric tracking here
- if (_context.isDropOutOfOrderRecord() && _context.getConsistencyMode()
== UpsertConfig.ConsistencyMode.NONE) {
- // For Upsert tables when some of the records get dropped when
dropOutOfOrderRecord is enabled, we donot
- // store the original record location when keys are not replaced, this
can potentially cause inconsistencies
- // leading to some rows not getting dropped when reconsumed. This can
be caused when a consuming segment
- // that is consumed from a different server is replaced with the
existing segment which consumed rows ahead
- // of the other server
- _logger.warn(
- "Found {} primary keys not replaced when replacing segment: {} for
upsert table with dropOutOfOrderRecord"
- + " enabled with no consistency mode. This can potentially
cause inconsistency between replicas",
- numKeysNotReplaced, segmentName);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
- numKeysNotReplaced);
- } else if (_partialUpsertHandler != null) {
- // For partial-upsert table, because we do not restore the original
record location when removing the primary
- // keys not replaced, it can potentially cause inconsistency between
replicas. This can happen when a
- // consuming segment is replaced by a committed segment that is
consumed from a different server with
- // different records (some stream consumer cannot guarantee consuming
the messages in the same order/
- // when a segment is replaced with lesser consumed rows from the other
server).
- _logger.warn("Found {} primary keys not replaced when replacing
segment: {} for partial-upsert table. This "
- + "can potentially cause inconsistency between replicas",
numKeysNotReplaced, segmentName);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
- numKeysNotReplaced);
+ checkForInconsistencies(segment, validDocIds, queryableDocIds,
oldSegment, validDocIdsForOldSegment, segmentName);
+ removeSegment(oldSegment, validDocIdsForOldSegment);
+ }
+ }
+
+ void checkForInconsistencies(ImmutableSegment segment,
ThreadSafeMutableRoaringBitmap validDocIds,
+ ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment,
+ MutableRoaringBitmap validDocIdsForOldSegment, String segmentName) {
+ int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
+ boolean isConsumingSegmentSeal = !(oldSegment instanceof ImmutableSegment);
+ // For partial-upsert table and upsert table with dropOutOfOrder=true &
consistencyMode = NONE, we do not store
+ // the previous record location when removing the primary keys not
replaced, it can potentially cause inconsistency
+ // between replicas. This can happen when a consuming segment is replaced
by a committed segment that is consumed
+ // from a different server with different records (some stream consumer
cannot guarantee consuming the messages in
+ // the same order/ when a segment is replaced with lesser consumed rows
from the other server).
+ if (isConsumingSegmentSeal && _context.isDropOutOfOrderRecord()
+ && _context.getConsistencyMode() == UpsertConfig.ConsistencyMode.NONE)
{
+ _logger.warn("Found {} primary keys not replaced when sealing consuming
segment: {} for upsert table with "
+ + "dropOutOfOrderRecord enabled with no consistency mode. This
can potentially cause inconsistency "
+ + "between replicas. Reverting back metadata changes and
triggering segment replacement.",
+ numKeysNotReplaced,
+ segmentName);
+ revertSegmentUpsertMetadataWithRetry(segment, validDocIds,
queryableDocIds, oldSegment, segmentName);
+ } else if (isConsumingSegmentSeal && _partialUpsertHandler != null) {
+ _logger.warn("Found {} primary keys not replaced when sealing consuming
segment: {} for partial-upsert table. "
+ + "This can potentially cause inconsistency between replicas. "
+ + "Reverting metadata changes and triggering segment replacement.",
numKeysNotReplaced, segmentName);
+ revertSegmentUpsertMetadataWithRetry(segment, validDocIds,
queryableDocIds, oldSegment, segmentName);
+ } else {
+ _logger.warn("Found {} primary keys not replaced for the segment: {}.",
numKeysNotReplaced, segmentName);
+ }
+ }
+
+ /**
+ * Reverts segment upsert metadata and retries addOrReplaceSegment with a
maximum retry limit to prevent infinite
+ * recursion in case of persistent inconsistencies.
+ */
+ void revertSegmentUpsertMetadataWithRetry(ImmutableSegment segment,
ThreadSafeMutableRoaringBitmap validDocIds,
+ ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment,
String segmentName) {
Review Comment:
This method should have protected visibility instead of package-private to
maintain consistency with the other protected methods in this class hierarchy.
This would better follow encapsulation principles for extensibility.
```suggestion
protected void revertSegmentUpsertMetadataWithRetry(ImmutableSegment
segment,
ThreadSafeMutableRoaringBitmap validDocIds,
ThreadSafeMutableRoaringBitmap queryableDocIds,
IndexSegment oldSegment, String segmentName) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]