Copilot commented on code in PR #17324:
URL: https://github.com/apache/pinot/pull/17324#discussion_r2592061869


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -264,18 +279,22 @@ protected boolean doAddRecord(MutableSegment segment, 
RecordInfo recordInfo) {
           if (currentRecordLocation != null) {
             // Existing primary key
 
+            IndexSegment currentSegment = currentRecordLocation.getSegment();
             // Update the record location when the new comparison value is 
greater than or equal to the current value.
             // Update the record location when there is a tie to keep the 
newer record.
             if 
(newComparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) 
{
-              IndexSegment currentSegment = currentRecordLocation.getSegment();
               int currentDocId = currentRecordLocation.getDocId();
               if (segment == currentSegment) {

Review Comment:
   The variable `currentSegment` is now retrieved before the comparison check 
at line 285, but it's only used when the comparison succeeds. If the comparison 
fails frequently (out-of-order records), this creates unnecessary 
`getSegment()` calls. Consider moving the `currentSegment` retrieval inside the 
`if` block at line 285 where it was originally located, and only retrieve it in 
the `else` block at line 295 where it's actually needed for the new condition 
check.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -669,27 +672,42 @@ public void replaceSegment(ImmutableSegment segment, 
@Nullable ThreadSafeMutable
         // of the other server
         _logger.warn(
             "Found {} primary keys not replaced when replacing segment: {} for 
upsert table with dropOutOfOrderRecord"
-                + " enabled with no consistency mode. This can potentially 
cause inconsistency between replicas",
-            numKeysNotReplaced, segmentName);
+                + " enabled with no consistency mode. This can potentially 
cause inconsistency between replicas. "
+                + "Reverting metadata changes and triggering segment 
replacement.", numKeysNotReplaced, segmentName);
         _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
             numKeysNotReplaced);
+        // Revert consuming segment pks to previous segment locations and 
perform metadata replacement again
+        revertCurrentSegmentUpsertMetadata();
+        addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, 
queryableDocIds, recordInfoIterator,
+            oldSegment, validDocIdsForOldSegment);

Review Comment:
   After reverting metadata and calling `addOrReplaceSegment()` recursively, 
there's no mechanism to prevent infinite recursion if the inconsistency 
persists on the retry attempt. If the same inconsistency occurs again 
(numKeysNotReplaced > 0), the code will call 
`revertCurrentSegmentUpsertMetadata()` and `addOrReplaceSegment()` again 
indefinitely. Add a retry counter or flag to limit recursion depth and handle 
persistent inconsistencies appropriately.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -245,6 +248,18 @@ public void doRemoveExpiredPrimaryKeys() {
     }
   }
 

Review Comment:
   The method `revertCurrentSegmentUpsertMetadata()` is missing the `@Override` 
annotation while `eraseKeyToPreviousLocationMap()` at line 259 has it. Both 
methods override base class methods. Add the `@Override` annotation to 
`revertCurrentSegmentUpsertMetadata()` for consistency.
   ```suggestion
   
     @Override
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -52,7 +52,10 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
   private final Map<String, Object> _reuseMergeResultHolder = new HashMap<>();
 
   @VisibleForTesting
-  final ConcurrentHashMap<Object, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+  final ConcurrentHashMap<Object, RecordLocation> 
_previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+  final ConcurrentHashMap<Object, 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation>
+      _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 

Review Comment:
   [nitpick] The declaration order of `_previousKeyToRecordLocationMap` and 
`_primaryKeyToRecordLocationMap` has been swapped without clear justification. 
This reordering makes the code harder to follow since the primary map is now 
declared after the auxiliary map. Consider maintaining the original order with 
`_primaryKeyToRecordLocationMap` declared first, or add a comment explaining 
why the auxiliary map needs to be declared first if there's a dependency.
   ```suggestion
     final ConcurrentHashMap<Object, 
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation>
         _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
   
     final ConcurrentHashMap<Object, RecordLocation> 
_previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to