Copilot commented on code in PR #17324:
URL: https://github.com/apache/pinot/pull/17324#discussion_r2592061869
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -264,18 +279,22 @@ protected boolean doAddRecord(MutableSegment segment,
RecordInfo recordInfo) {
if (currentRecordLocation != null) {
// Existing primary key
+ IndexSegment currentSegment = currentRecordLocation.getSegment();
// Update the record location when the new comparison value is
greater than or equal to the current value.
// Update the record location when there is a tie to keep the
newer record.
if
(newComparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0)
{
- IndexSegment currentSegment = currentRecordLocation.getSegment();
int currentDocId = currentRecordLocation.getDocId();
if (segment == currentSegment) {
Review Comment:
The variable `currentSegment` is now retrieved before the comparison check
at line 285, but it's only used when the comparison succeeds. If the comparison
fails frequently (out-of-order records), this creates unnecessary
`getSegment()` calls. Consider moving the `currentSegment` retrieval inside the
`if` block at line 285 where it was originally located, and only retrieve it in
the `else` block at line 295 where it's actually needed for the new condition
check.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -669,27 +672,42 @@ public void replaceSegment(ImmutableSegment segment,
@Nullable ThreadSafeMutable
// of the other server
_logger.warn(
"Found {} primary keys not replaced when replacing segment: {} for
upsert table with dropOutOfOrderRecord"
- + " enabled with no consistency mode. This can potentially
cause inconsistency between replicas",
- numKeysNotReplaced, segmentName);
+ + " enabled with no consistency mode. This can potentially
cause inconsistency between replicas. "
+ + "Reverting metadata changes and triggering segment
replacement.", numKeysNotReplaced, segmentName);
_serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
numKeysNotReplaced);
+ // Revert consuming segment pks to previous segment locations and
perform metadata replacement again
+ revertCurrentSegmentUpsertMetadata();
+ addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds,
queryableDocIds, recordInfoIterator,
+ oldSegment, validDocIdsForOldSegment);
Review Comment:
After reverting metadata and calling `addOrReplaceSegment()` recursively,
there's no mechanism to prevent infinite recursion if the inconsistency
persists on the retry attempt. If the same inconsistency occurs again
(numKeysNotReplaced > 0), the code will call
`revertCurrentSegmentUpsertMetadata()` and `addOrReplaceSegment()` again
indefinitely. Add a retry counter or flag to limit recursion depth and handle
persistent inconsistencies appropriately.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -245,6 +248,18 @@ public void doRemoveExpiredPrimaryKeys() {
}
}
Review Comment:
The method `revertCurrentSegmentUpsertMetadata()` is missing the `@Override`
annotation while `eraseKeyToPreviousLocationMap()` at line 259 has it. Both
methods override base class methods. Add the `@Override` annotation to
`revertCurrentSegmentUpsertMetadata()` for consistency.
```suggestion
@Override
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -52,7 +52,10 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
private final Map<String, Object> _reuseMergeResultHolder = new HashMap<>();
@VisibleForTesting
- final ConcurrentHashMap<Object, RecordLocation>
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+ final ConcurrentHashMap<Object, RecordLocation>
_previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+ final ConcurrentHashMap<Object,
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation>
+ _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
Review Comment:
[nitpick] The declaration order of `_previousKeyToRecordLocationMap` and
`_primaryKeyToRecordLocationMap` has been swapped without clear justification.
This reordering makes the code harder to follow since the primary map is now
declared after the auxiliary map. Consider maintaining the original order with
`_primaryKeyToRecordLocationMap` declared first, or add a comment explaining
why the auxiliary map needs to be declared first if there's a dependency.
```suggestion
final ConcurrentHashMap<Object,
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation>
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
final ConcurrentHashMap<Object, RecordLocation>
_previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]