KKcorps commented on code in PR #9095:
URL: https://github.com/apache/pinot/pull/9095#discussion_r928155128


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java:
##########
@@ -208,11 +275,82 @@ public void addRecord(IndexSegment segment, RecordInfo 
recordInfo) {
             return new RecordLocation(segment, recordInfo.getDocId(), 
recordInfo.getComparisonValue());
           }
         });
+
     // Update metrics
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
         _primaryKeyToRecordLocationMap.size());
   }
 
+  /**
+   * Replaces the upsert metadata for the old segment with the new immutable 
segment.
+   */
+  public void replaceSegment(ImmutableSegment newSegment, IndexSegment 
oldSegment) {
+    String segmentName = newSegment.getSegmentName();
+    
Preconditions.checkArgument(segmentName.equals(oldSegment.getSegmentName()),
+        "Cannot replace segment with different name for table: {}, old 
segment: {}, new segment: {}",
+        _tableNameWithType, oldSegment.getSegmentName(), segmentName);
+    _logger.info("Replacing upsert metadata for {} segment: {}",
+        oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName);
+
+    addSegment(newSegment);
+
+    MutableRoaringBitmap validDocIds =
+        oldSegment.getValidDocIds() != null ? 
oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+    if (validDocIds != null && !validDocIds.isEmpty()) {
+      int numDocsNotReplaced = validDocIds.getCardinality();
+      if (_partialUpsertHandler != null) {
+        _logger.error("Got {} primary keys not replaced when replacing 
segment: {} for partial upsert table. This can "
+            + "potentially cause inconsistency between replicas", 
numDocsNotReplaced, segmentName);
+        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_ROWS_NOT_REPLACED,
+            numDocsNotReplaced);
+      } else {
+        _logger.info("Got {} primary keys not replaced when replacing segment: 
{}", numDocsNotReplaced, segmentName);
+      }
+      removeSegment(oldSegment);
+    }
+
+    _logger.info("Finish replacing upsert metadata for segment: {}", 
segmentName);
+  }
+
+  /**
+   * Removes the upsert metadata for the given segment.
+   */
+  public void removeSegment(IndexSegment segment) {
+    String segmentName = segment.getSegmentName();
+    _logger.info("Removing upsert metadata for segment: {}, primary key count: 
{}", segmentName,
+        _primaryKeyToRecordLocationMap.size());
+
+    MutableRoaringBitmap validDocIds =
+        segment.getValidDocIds() != null ? 
segment.getValidDocIds().getMutableRoaringBitmap() : null;
+    if (validDocIds == null || validDocIds.isEmpty()) {
+      _logger.info("Skipping removing upsert metadata for segment without 
valid docs: {}", segmentName);
+      return;
+    }
+
+    _logger.info("Trying to remove {} primary keys from upsert metadata for 
segment: {}", validDocIds.getCardinality(),

Review Comment:
   nit: This should be a debug log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to