Copilot commented on code in PR #17324:
URL: https://github.com/apache/pinot/pull/17324#discussion_r2651134937
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -784,6 +814,17 @@ private List<RecordInfo> getRecordInfoListForTTL(int
numRecords, int[] primaryKe
return recordInfoList;
}
+ // Helper method for new reversion tests that need Integer comparison values
+ private List<RecordInfo> getRecordInfoListWithIntegerComparison(int
numRecords, int[] primaryKeys, int[] timestamps,
Review Comment:
The method name `getRecordInfoListWithIntegerComparison` is inconsistent
with the existing naming pattern. The existing method `getRecordInfoListForTTL`
uses a different naming convention. Consider renaming to `getRecordInfoList`
(overloaded) or `getRecordInfoListWithIntComparison` for consistency with the
codebase's naming style.
```suggestion
private List<RecordInfo> getRecordInfoListWithIntComparison(int
numRecords, int[] primaryKeys, int[] timestamps,
```
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -1724,4 +1901,411 @@ public int hashCode() {
return _value;
}
}
+
+ // Tests for upsert metadata reversion functionality
+ @Test
+ public void testPartialUpsertSameDocsReplacement() throws IOException {
+ // Test partial upserts with old and new segments having same number of
docs
+ // This test verifies that when all keys are present, no reversion occurs
+ PartialUpsertHandler mockPartialUpsertHandler =
mock(PartialUpsertHandler.class);
+ UpsertContext upsertContext =
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+ .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ int numRecords1 = 3;
+ int[] primaryKeys1 = new int[]{1, 2, 3};
+ int[] timestamps1 = new int[]{100, 200, 300};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords1; i++) {
+ validDocIds1.add(i);
+ }
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1,
validDocIds1, null,
+ primaryKeysList1, timestamps1);
+ List<RecordInfo> recordInfoList1 =
getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
+ timestamps1, null);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 1, segment1, 0, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 1, 200,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment1, 2, 300,
HashFunction.NONE);
+
+ // Create new segment with same 3 records but updated timestamps
+ int numRecords2 = 3;
+ int[] primaryKeys2 = new int[]{1, 2, 3};
+ int[] timestamps2 = new int[]{150, 250, 350};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords2; i++) {
+ validDocIds2.add(i);
+ }
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
+ primaryKeysList2, timestamps2);
+ List<RecordInfo> recordInfoList2 =
getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2,
+ timestamps2, null);
+
+ // Replace segment - should trigger reversion logic but no reversion
needed since all keys are present
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+
+ // Verify replacement - all records should be in new segment
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 1, segment2, 0, 150,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 250,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment2, 2, 350,
HashFunction.NONE);
+
+ // New segment should have all docs valid
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
3);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testPartialUpsertOldSegmentTriggerReversion() throws IOException
{
+ // Test partial upserts with consuming (mutable) segment being sealed -
revert should be triggered
+ // Note: Revert logic only applies when sealing a consuming segment, not
for immutable segment replacement
+ PartialUpsertHandler mockPartialUpsertHandler =
mock(PartialUpsertHandler.class);
+ UpsertContext upsertContext =
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+ .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ // Create a mutable (consuming) segment with 4 records - use
mockMutableSegmentWithDataSource
+ // to support removeSegment which needs to read primary keys
+ int[] mutablePrimaryKeys = new int[]{1, 2, 3, 4};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1,
validDocIds1, null, mutablePrimaryKeys);
+
+ // Add records to the mutable segment
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(1), 0, new Integer(100), false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(2), 1, new Integer(200), false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(3), 2, new Integer(300), false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(4), 3, new Integer(400), false));
Review Comment:
Avoid using the deprecated `new Integer()` constructor. Use
`Integer.valueOf()` or autoboxing (just pass the int value directly) instead.
The `new Integer()` constructor has been deprecated since Java 9.
```suggestion
upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(1), 0, 100, false));
upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(2), 1, 200, false));
upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(3), 2, 300, false));
upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(4), 3, 400, false));
```
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -784,6 +814,17 @@ private List<RecordInfo> getRecordInfoListForTTL(int
numRecords, int[] primaryKe
return recordInfoList;
}
+ // Helper method for new reversion tests that need Integer comparison values
+ private List<RecordInfo> getRecordInfoListWithIntegerComparison(int
numRecords, int[] primaryKeys, int[] timestamps,
+ @Nullable boolean[] deleteRecordFlags) {
+ List<RecordInfo> recordInfoList = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i,
Integer.valueOf(timestamps[i]),
Review Comment:
Avoid using `Integer.valueOf()` to wrap primitive int values. The comparison
value can be passed directly as `timestamps[i]` since autoboxing will handle
the conversion. This reduces unnecessary object creation and improves
readability.
```suggestion
recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i,
timestamps[i],
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -620,6 +624,8 @@ protected void doReplaceSegment(ImmutableSegment segment,
IndexSegment oldSegmen
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}
+ private static final int MAX_UPSERT_REVERT_RETRIES = 3;
Review Comment:
The retry limit is hardcoded to 3. Consider making this configurable through
UpsertConfig to allow operators to adjust the retry behavior based on their
specific consistency requirements and performance constraints.
```suggestion
private static final String MAX_UPSERT_REVERT_RETRIES_SYS_PROP =
"pinot.upsert.maxRevertRetries";
private static final int DEFAULT_MAX_UPSERT_REVERT_RETRIES = 3;
private static final int MAX_UPSERT_REVERT_RETRIES =
Integer.getInteger(MAX_UPSERT_REVERT_RETRIES_SYS_PROP,
DEFAULT_MAX_UPSERT_REVERT_RETRIES);
```
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -1724,4 +1901,411 @@ public int hashCode() {
return _value;
}
}
+
+ // Tests for upsert metadata reversion functionality
+ @Test
+ public void testPartialUpsertSameDocsReplacement() throws IOException {
+ // Test partial upserts with old and new segments having same number of
docs
+ // This test verifies that when all keys are present, no reversion occurs
+ PartialUpsertHandler mockPartialUpsertHandler =
mock(PartialUpsertHandler.class);
+ UpsertContext upsertContext =
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+ .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ int numRecords1 = 3;
+ int[] primaryKeys1 = new int[]{1, 2, 3};
+ int[] timestamps1 = new int[]{100, 200, 300};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords1; i++) {
+ validDocIds1.add(i);
+ }
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1,
validDocIds1, null,
+ primaryKeysList1, timestamps1);
+ List<RecordInfo> recordInfoList1 =
getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
+ timestamps1, null);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 1, segment1, 0, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 1, 200,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment1, 2, 300,
HashFunction.NONE);
+
+ // Create new segment with same 3 records but updated timestamps
+ int numRecords2 = 3;
+ int[] primaryKeys2 = new int[]{1, 2, 3};
+ int[] timestamps2 = new int[]{150, 250, 350};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords2; i++) {
+ validDocIds2.add(i);
+ }
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
+ primaryKeysList2, timestamps2);
+ List<RecordInfo> recordInfoList2 =
getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2,
+ timestamps2, null);
+
+ // Replace segment - should trigger reversion logic but no reversion
needed since all keys are present
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+
+ // Verify replacement - all records should be in new segment
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 1, segment2, 0, 150,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 250,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment2, 2, 350,
HashFunction.NONE);
+
+ // New segment should have all docs valid
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
3);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testPartialUpsertOldSegmentTriggerReversion() throws IOException
{
+ // Test partial upserts with consuming (mutable) segment being sealed -
revert should be triggered
+ // Note: Revert logic only applies when sealing a consuming segment, not
for immutable segment replacement
+ PartialUpsertHandler mockPartialUpsertHandler =
mock(PartialUpsertHandler.class);
+ UpsertContext upsertContext =
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+ .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ // Create a mutable (consuming) segment with 4 records - use
mockMutableSegmentWithDataSource
+ // to support removeSegment which needs to read primary keys
+ int[] mutablePrimaryKeys = new int[]{1, 2, 3, 4};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1,
validDocIds1, null, mutablePrimaryKeys);
+
+ // Add records to the mutable segment
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(1), 0, new Integer(100), false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(2), 1, new Integer(200), false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(3), 2, new Integer(300), false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(4), 3, new Integer(400), false));
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 4);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().getCardinality(), 4);
+
+ int numRecords2 = 2;
+ int[] primaryKeys2 = new int[]{1, 3};
+ int[] timestamps2 = new int[]{150, 350};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
+ primaryKeysList2, timestamps2);
+
+ // Replace mutable with immutable (consuming segment seal) - revert SHOULD
be triggered
+ upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
+ getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2,
timestamps2, null).iterator(),
+ mutableSegment);
+
+ assertEquals(recordLocationMap.size(), 2);
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
2);
+ checkRecordLocation(recordLocationMap, 1, segment2, 0, 150,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment2, 1, 350,
HashFunction.NONE);
+
+ // Mutable segment's validDocIds should be 0 after removal
+ assertEquals(validDocIds1.getMutableRoaringBitmap().getCardinality(), 0);
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testPartialUpsertOldSegmentLesserDocs() throws IOException {
+ // Test partial upserts with old segment having fewer docs than new segment
+ PartialUpsertHandler mockPartialUpsertHandler =
mock(PartialUpsertHandler.class);
+ UpsertContext upsertContext =
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+ .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ int numRecords1 = 2;
+ int[] primaryKeys1 = new int[]{1, 2};
+ int[] timestamps1 = new int[]{100, 200};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords1; i++) {
+ validDocIds1.add(i);
+ }
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithTimestamps(1, validDocIds1, null,
primaryKeysList1, timestamps1);
+ List<RecordInfo> recordInfoList1 =
+ getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
timestamps1, null);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 2);
+
+ int numRecords2 = 4;
+ int[] primaryKeys2 = new int[]{1, 2, 3, 4};
+ int[] timestamps2 = new int[]{150, 250, 300, 400};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords2; i++) {
+ validDocIds2.add(i);
+ }
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
+ primaryKeysList2, timestamps2);
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+
+ // Verify state after replacement - all records should be in new segment
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 1, segment2, 0, 150,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 250,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment2, 2, 300,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 4, segment2, 3, 400,
HashFunction.NONE);
+
+ // New segment should have all docs valid
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
4);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testFullUpsertConsistencyNoneSameDocs() throws IOException {
+ // Test full upserts with consistency=NONE and same number of docs
+ UpsertContext upsertContext = _contextBuilder
+ .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE)
+ .setDropOutOfOrderRecord(true)
+ .build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ // Create old segment with 3 records
+ int numRecords1 = 3;
+ int[] primaryKeys1 = new int[]{10, 20, 30};
+ int[] timestamps1 = new int[]{1000, 2000, 3000};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords1; i++) {
+ validDocIds1.add(i);
+ }
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1,
validDocIds1, null,
+ primaryKeysList1, timestamps1);
+ List<RecordInfo> recordInfoList1 =
getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
+ timestamps1, null);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+
+ int numRecords2 = 3;
+ int[] primaryKeys2 = new int[]{10, 20, 30};
+ int[] timestamps2 = new int[]{1500, 2500, 3500};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords2; i++) {
+ validDocIds2.add(i);
+ }
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
+ primaryKeysList2, timestamps2);
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 20, segment2, 1, 2500,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 30, segment2, 2, 3500,
HashFunction.NONE);
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
3);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testFullUpsertConsistencyNoneOldSegmentMoreDocs()
+ throws Exception {
+ // Test full upserts with consistency=NONE where old segment has more docs
+ // Using real segments instead of mocks to avoid complex data source
mocking
+ UpsertContext upsertContext =
+
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ String segmentName = "test_segment";
+
+ // Create first real segment with 3 records
+ int[] primaryKeys1 = new int[]{10, 30, 40};
+ int[] timestamps1 = new int[]{1500, 3500, 4000};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment1 = createRealSegment(segmentName,
primaryKeys1, timestamps1, validDocIds1);
+
+ upsertMetadataManager.addSegment(segment1);
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+
+ // Create second real segment with 2 records (subset of first)
+ int[] primaryKeys2 = new int[]{10, 30};
+ int[] timestamps2 = new int[]{1500, 3500};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment2 = createRealSegment(segmentName,
primaryKeys2, timestamps2, validDocIds2);
+
+ // Replace segment - RecordInfoReader will read real data from segment2
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+
+ assertEquals(recordLocationMap.size(), 2);
+ checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 30, segment2, 1, 3500,
HashFunction.NONE);
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
2);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+
+ // Clean up real segments
+ segment1.destroy();
+ segment2.destroy();
+ }
+
+ @Test
+ public void testFullUpsertRegularConsistencyMode()
+ throws IOException {
+ // Test full upserts with regular consistency mode (not NONE) - no
reversion should occur
+ UpsertContext upsertContext =
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.SNAPSHOT).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ int numRecords1 = 3;
+ int[] primaryKeys1 = new int[]{100, 200, 300};
+ int[] timestamps1 = new int[]{10000, 20000, 30000};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords1; i++) {
+ validDocIds1.add(i);
+ }
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithTimestamps(1, validDocIds1, null,
primaryKeysList1, timestamps1);
+ List<RecordInfo> recordInfoList1 =
+ getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
timestamps1, null);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+ int numRecords2 = 1;
+ int[] primaryKeys2 = new int[]{100};
+ int[] timestamps2 = new int[]{15000};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords2; i++) {
+ validDocIds2.add(i);
+ }
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 =
+ mockImmutableSegmentWithTimestamps(1, validDocIds2, null,
primaryKeysList2, timestamps2);
+
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+
+ assertEquals(recordLocationMap.size(), 1);
+ checkRecordLocation(recordLocationMap, 100, segment2, 0, 15000,
HashFunction.NONE);
+
+
assertEquals(segment1.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
2);
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
1);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testRevertOnlyAppliesForConsumingSegmentSeal()
+ throws IOException {
+ UpsertContext upsertContext =
+
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ int[] mutablePrimaryKeys = new int[]{10, 20, 30};
+ ThreadSafeMutableRoaringBitmap validDocIdsMutable = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1,
validDocIdsMutable, null, mutablePrimaryKeys);
+
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(10), 0, new Integer(1000), false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(20), 1, new Integer(2000), false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(30), 2, new Integer(3000), false));
Review Comment:
Avoid using the deprecated `new Integer()` constructor. Use
`Integer.valueOf()` or autoboxing instead. The `new Integer()` constructor has
been deprecated since Java 9.
```suggestion
upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(10), 0, Integer.valueOf(1000), false));
upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(20), 1, Integer.valueOf(2000), false));
upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(30), 2, Integer.valueOf(3000), false));
```
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -808,26 +849,127 @@ private List<PrimaryKey> getPrimaryKeyList(int
numRecords, int[] primaryKeys) {
private static ImmutableSegmentImpl mockImmutableSegment(int sequenceNumber,
ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
List<PrimaryKey> primaryKeys) {
+ return mockImmutableSegmentWithTimestamps(sequenceNumber, validDocIds,
queryableDocIds, primaryKeys, null);
+ }
+
+ private static ImmutableSegmentImpl mockImmutableSegmentWithTimestamps(int
sequenceNumber,
+ ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
+ List<PrimaryKey> primaryKeys, @Nullable int[] timestamps) {
ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
when(segment.getValidDocIds()).thenReturn(validDocIds);
when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
- DataSource dataSource = mock(DataSource.class);
- when(segment.getDataSource(anyString())).thenReturn(dataSource);
- ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
- when(forwardIndex.isSingleValue()).thenReturn(true);
- when(forwardIndex.getStoredType()).thenReturn(DataType.INT);
- when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
- invocation ->
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
- when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+
+ // Enhanced mocking for RecordInfoReader to work properly
+ // Mock primary key column data source
+ DataSource primaryKeyDataSource = mock(DataSource.class);
+ ForwardIndexReader primaryKeyForwardIndex = mock(ForwardIndexReader.class);
+ when(primaryKeyForwardIndex.isSingleValue()).thenReturn(true);
+ when(primaryKeyForwardIndex.getStoredType()).thenReturn(DataType.INT);
+ when(primaryKeyForwardIndex.createContext()).thenReturn(null);
+ when(primaryKeyForwardIndex.getInt(anyInt(), any())).thenAnswer(invocation
-> {
+ int docId = invocation.getArgument(0);
+ if (primaryKeys != null && docId < primaryKeys.size()) {
+ return (Integer) primaryKeys.get(docId).getValues()[0];
+ }
+ return 1000 + docId; // Default fallback
+ });
+
when(primaryKeyDataSource.getForwardIndex()).thenReturn(primaryKeyForwardIndex);
+
+ // Mock comparison column data source
+ DataSource comparisonDataSource = mock(DataSource.class);
+ ForwardIndexReader comparisonForwardIndex = mock(ForwardIndexReader.class);
+ when(comparisonForwardIndex.isSingleValue()).thenReturn(true);
+ when(comparisonForwardIndex.getStoredType()).thenReturn(DataType.INT);
+ when(comparisonForwardIndex.createContext()).thenReturn(null);
+ when(comparisonForwardIndex.getInt(anyInt(), any())).thenAnswer(invocation
-> {
+ int docId = invocation.getArgument(0);
+ // Return actual timestamp values if provided, otherwise default values
+ if (timestamps != null && docId < timestamps.length) {
+ return timestamps[docId];
+ }
+ return 1000 + (docId * 100); // Default fallback
Review Comment:
The magic numbers `1000` and `100` are used as fallback values without
explanation. Consider extracting these to named constants like
`DEFAULT_TIMESTAMP_BASE` and `DEFAULT_TIMESTAMP_STEP` to improve code
maintainability and document the intent.
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -808,26 +849,127 @@ private List<PrimaryKey> getPrimaryKeyList(int
numRecords, int[] primaryKeys) {
private static ImmutableSegmentImpl mockImmutableSegment(int sequenceNumber,
ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
List<PrimaryKey> primaryKeys) {
+ return mockImmutableSegmentWithTimestamps(sequenceNumber, validDocIds,
queryableDocIds, primaryKeys, null);
+ }
+
+ private static ImmutableSegmentImpl mockImmutableSegmentWithTimestamps(int
sequenceNumber,
+ ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
+ List<PrimaryKey> primaryKeys, @Nullable int[] timestamps) {
ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
when(segment.getValidDocIds()).thenReturn(validDocIds);
when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
- DataSource dataSource = mock(DataSource.class);
- when(segment.getDataSource(anyString())).thenReturn(dataSource);
- ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
- when(forwardIndex.isSingleValue()).thenReturn(true);
- when(forwardIndex.getStoredType()).thenReturn(DataType.INT);
- when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
- invocation ->
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
- when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+
+ // Enhanced mocking for RecordInfoReader to work properly
+ // Mock primary key column data source
+ DataSource primaryKeyDataSource = mock(DataSource.class);
+ ForwardIndexReader primaryKeyForwardIndex = mock(ForwardIndexReader.class);
+ when(primaryKeyForwardIndex.isSingleValue()).thenReturn(true);
+ when(primaryKeyForwardIndex.getStoredType()).thenReturn(DataType.INT);
+ when(primaryKeyForwardIndex.createContext()).thenReturn(null);
+ when(primaryKeyForwardIndex.getInt(anyInt(), any())).thenAnswer(invocation
-> {
+ int docId = invocation.getArgument(0);
+ if (primaryKeys != null && docId < primaryKeys.size()) {
+ return (Integer) primaryKeys.get(docId).getValues()[0];
+ }
+ return 1000 + docId; // Default fallback
Review Comment:
The magic number `1000` is used as a fallback value without explanation.
Consider extracting this to a named constant like `DEFAULT_PRIMARY_KEY_BASE` or
`FALLBACK_PRIMARY_KEY_OFFSET` to improve code maintainability and make the
intent clearer.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -659,37 +665,107 @@ public void replaceSegment(ImmutableSegment segment,
@Nullable ThreadSafeMutable
validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
}
if (validDocIdsForOldSegment != null &&
!validDocIdsForOldSegment.isEmpty()) {
- int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
- // Add the new metric tracking here
- if (_context.isDropOutOfOrderRecord() && _context.getConsistencyMode()
== UpsertConfig.ConsistencyMode.NONE) {
- // For Upsert tables when some of the records get dropped when
dropOutOfOrderRecord is enabled, we donot
- // store the original record location when keys are not replaced, this
can potentially cause inconsistencies
- // leading to some rows not getting dropped when reconsumed. This can
be caused when a consuming segment
- // that is consumed from a different server is replaced with the
existing segment which consumed rows ahead
- // of the other server
- _logger.warn(
- "Found {} primary keys not replaced when replacing segment: {} for
upsert table with dropOutOfOrderRecord"
- + " enabled with no consistency mode. This can potentially
cause inconsistency between replicas",
- numKeysNotReplaced, segmentName);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
- numKeysNotReplaced);
- } else if (_partialUpsertHandler != null) {
- // For partial-upsert table, because we do not restore the original
record location when removing the primary
- // keys not replaced, it can potentially cause inconsistency between
replicas. This can happen when a
- // consuming segment is replaced by a committed segment that is
consumed from a different server with
- // different records (some stream consumer cannot guarantee consuming
the messages in the same order/
- // when a segment is replaced with lesser consumed rows from the other
server).
- _logger.warn("Found {} primary keys not replaced when replacing
segment: {} for partial-upsert table. This "
- + "can potentially cause inconsistency between replicas",
numKeysNotReplaced, segmentName);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
- numKeysNotReplaced);
+ checkForInconsistencies(segment, validDocIds, queryableDocIds,
oldSegment, validDocIdsForOldSegment, segmentName);
+ removeSegment(oldSegment, validDocIdsForOldSegment);
+ }
+ }
+
+ void checkForInconsistencies(ImmutableSegment segment,
ThreadSafeMutableRoaringBitmap validDocIds,
+ ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment,
+ MutableRoaringBitmap validDocIdsForOldSegment, String segmentName) {
+ int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
+ // Only apply revert logic when sealing a consuming (mutable) segment, not
for immutable segment replacement.
+ // Consuming segments are mutable, so we check if the old segment is NOT
an ImmutableSegment.
+ boolean isConsumingSegmentSeal = !(oldSegment instanceof ImmutableSegment);
+ if (isConsumingSegmentSeal && _context.isDropOutOfOrderRecord()
+ && _context.getConsistencyMode() == UpsertConfig.ConsistencyMode.NONE)
{
+ // For Upsert tables when some of the records get dropped when
dropOutOfOrderRecord is enabled, we donot
+ // store the original record location when keys are not replaced, this
can potentially cause inconsistencies
+ // leading to some rows not getting dropped when reconsumed. This can be
caused when a consuming segment
+ // that is consumed from a different server is replaced with the
existing segment which consumed rows ahead
+ // of the other server
+ _logger.warn("Found {} primary keys not replaced when sealing consuming
segment: {} for upsert table with "
+ + "dropOutOfOrderRecord enabled with no consistency mode. This
can potentially cause inconsistency "
+ + "between replicas. Reverting back metadata changes and
triggering segment replacement.",
+ numKeysNotReplaced, segmentName);
+ // Revert consuming segment pks to previous segment locations and
perform metadata replacement again with retry
+ revertSegmentUpsertMetadataWithRetry(segment, validDocIds,
queryableDocIds, oldSegment, segmentName);
+ } else if (isConsumingSegmentSeal && _partialUpsertHandler != null) {
+ // For partial-upsert table, because we do not restore the original
record location when removing the primary
+ // keys not replaced, it can potentially cause inconsistency between
replicas. This can happen when a
+ // consuming segment is replaced by a committed segment that is consumed
from a different server with
+ // different records (some stream consumer cannot guarantee consuming
the messages in the same order/
+ // when a segment is replaced with lesser consumed rows from the other
server).
+ _logger.warn("Found {} primary keys not replaced when sealing consuming
segment: {} for partial-upsert table. "
+ + "This can potentially cause inconsistency between replicas. "
+ + "Reverting metadata changes and triggering segment replacement.",
numKeysNotReplaced, segmentName);
+ // Revert consuming segment pks to previous segment locations and
perform metadata replacement again with retry
+ revertSegmentUpsertMetadataWithRetry(segment, validDocIds,
queryableDocIds, oldSegment, segmentName);
+ } else if (isConsumingSegmentSeal) {
+ _logger.info("Found {} primary keys not replaced when sealing consuming
segment: {}", numKeysNotReplaced,
+ segmentName);
+ } else {
+ // For immutable segment replacement (e.g., segment reload), just log
the info without revert
+ _logger.warn("Found {} primary keys not replaced when replacing
immutable segment: {}. "
+ + "Skipping revert as this is not a consuming segment.",
numKeysNotReplaced, segmentName);
+ }
+ }
+
+ /**
+ * Reverts segment upsert metadata and retries addOrReplaceSegment with a
maximum retry limit to prevent infinite
+ * recursion in case of persistent inconsistencies.
+ */
+ void revertSegmentUpsertMetadataWithRetry(ImmutableSegment segment,
ThreadSafeMutableRoaringBitmap validDocIds,
+ ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment,
String segmentName) {
+ for (int retryCount = 0; retryCount < MAX_UPSERT_REVERT_RETRIES;
retryCount++) {
+ revertCurrentSegmentUpsertMetadata(oldSegment, validDocIds,
queryableDocIds);
+ MutableRoaringBitmap validDocIdsForOldSegment =
getValidDocIdsForOldSegment(oldSegment);
+ try (UpsertUtils.RecordInfoReader recordInfoReader = new
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
+ _comparisonColumns, _deleteRecordColumn)) {
+ Iterator<RecordInfo> latestRecordInfoIterator =
+ UpsertUtils.getRecordInfoIterator(recordInfoReader,
segment.getSegmentMetadata().getTotalDocs());
+ addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds,
queryableDocIds, latestRecordInfoIterator,
+ oldSegment, validDocIdsForOldSegment);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Caught exception while replacing segment metadata
during inconsistencies: %s, table: %s",
+ segmentName, _tableNameWithType), e);
+ }
+
+ validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
+ if (validDocIdsForOldSegment == null ||
validDocIdsForOldSegment.isEmpty()) {
+ _logger.info("Successfully resolved inconsistency for segment: {}
after {} retry attempt(s)", segmentName,
+ retryCount + 1);
+ return;
+ }
+
+ int numKeysStillNotReplaced = validDocIdsForOldSegment.getCardinality();
+ if (retryCount < MAX_UPSERT_REVERT_RETRIES - 1) {
+ _logger.warn("Retry {}/{}: Still found {} primary keys not replaced
for segment: {}. Retrying...",
+ retryCount + 1, MAX_UPSERT_REVERT_RETRIES,
numKeysStillNotReplaced, segmentName);
} else {
- _logger.info("Found {} primary keys not replaced when replacing
segment: {}", numKeysNotReplaced, segmentName);
+ _logger.error("Exhausted all {} retries for segment: {}. Found {}
primary keys still not replaced. "
+ + "Proceeding with current state which may cause
inconsistency.", MAX_UPSERT_REVERT_RETRIES,
+ segmentName,
+ numKeysStillNotReplaced);
Review Comment:
The log message spans multiple lines with inconsistent indentation. The
third parameter `segmentName` should be aligned with the other parameters for
better readability. Consider formatting as `_logger.error(\"Exhausted all {}
retries for segment: {}. Found {} primary keys still not replaced. Proceeding
with current state which may cause inconsistency.\", MAX_UPSERT_REVERT_RETRIES,
segmentName, numKeysStillNotReplaced);`
```suggestion
+ "Proceeding with current state which may cause
inconsistency.", MAX_UPSERT_REVERT_RETRIES, segmentName,
numKeysStillNotReplaced);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]