xiangfu0 commented on code in PR #17324:
URL: https://github.com/apache/pinot/pull/17324#discussion_r2592189119
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -669,27 +672,42 @@ public void replaceSegment(ImmutableSegment segment,
@Nullable ThreadSafeMutable
// of the other server
_logger.warn(
"Found {} primary keys not replaced when replacing segment: {} for
upsert table with dropOutOfOrderRecord"
- + " enabled with no consistency mode. This can potentially
cause inconsistency between replicas",
- numKeysNotReplaced, segmentName);
+ + " enabled with no consistency mode. This can potentially
cause inconsistency between replicas. "
+ + "Reverting metadata changes and triggering segment
replacement.", numKeysNotReplaced, segmentName);
_serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
numKeysNotReplaced);
+ // Revert consuming segment pks to previous segment locations and
perform metadata replacement again
+ revertCurrentSegmentUpsertMetadata();
+ addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds,
queryableDocIds, recordInfoIterator,
Review Comment:
this recordInfoIterator was already exhausted in line 656.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -245,6 +248,18 @@ public void doRemoveExpiredPrimaryKeys() {
}
}
+ protected void revertCurrentSegmentUpsertMetadata() {
+ _logger.info("Reverting Upsert metadata for {} keys",
_previousKeyToRecordLocationMap.size());
+ // Revert to previous locations present in other segments
+ // For the newly added keys into the segment, it will be considered new
when metadata is replaced again
+ _primaryKeyToRecordLocationMap.putAll(_previousKeyToRecordLocationMap);
Review Comment:
will you update the previous segment valid doc ids?
After the rollback old segments active keys are not queryable anymore.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -669,27 +672,42 @@ public void replaceSegment(ImmutableSegment segment,
@Nullable ThreadSafeMutable
// of the other server
_logger.warn(
"Found {} primary keys not replaced when replacing segment: {} for
upsert table with dropOutOfOrderRecord"
- + " enabled with no consistency mode. This can potentially
cause inconsistency between replicas",
- numKeysNotReplaced, segmentName);
+ + " enabled with no consistency mode. This can potentially
cause inconsistency between replicas. "
+ + "Reverting metadata changes and triggering segment
replacement.", numKeysNotReplaced, segmentName);
_serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
numKeysNotReplaced);
+ // Revert consuming segment pks to previous segment locations and
perform metadata replacement again
+ revertCurrentSegmentUpsertMetadata();
+ addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds,
queryableDocIds, recordInfoIterator,
+ oldSegment, validDocIdsForOldSegment);
} else if (_partialUpsertHandler != null) {
// For partial-upsert table, because we do not restore the original
record location when removing the primary
// keys not replaced, it can potentially cause inconsistency between
replicas. This can happen when a
// consuming segment is replaced by a committed segment that is
consumed from a different server with
// different records (some stream consumer cannot guarantee consuming
the messages in the same order/
// when a segment is replaced with lesser consumed rows from the other
server).
_logger.warn("Found {} primary keys not replaced when replacing
segment: {} for partial-upsert table. This "
- + "can potentially cause inconsistency between replicas",
numKeysNotReplaced, segmentName);
+ + "can potentially cause inconsistency between replicas. "
+ + "Reverting metadata changes and triggering segment
replacement.", numKeysNotReplaced, segmentName);
_serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
numKeysNotReplaced);
+ // Revert consuming segment pks to previous segment locations and
perform metadata replacement again
+ revertCurrentSegmentUpsertMetadata();
+ addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds,
queryableDocIds, recordInfoIterator,
+ oldSegment, validDocIdsForOldSegment);
} else {
_logger.info("Found {} primary keys not replaced when replacing
segment: {}", numKeysNotReplaced, segmentName);
}
removeSegment(oldSegment, validDocIdsForOldSegment);
}
}
+ protected void eraseKeyToPreviousLocationMap() {
Review Comment:
make this abstract?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -52,7 +52,10 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
private final Map<String, Object> _reuseMergeResultHolder = new HashMap<>();
@VisibleForTesting
- final ConcurrentHashMap<Object, RecordLocation>
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+ final ConcurrentHashMap<Object, RecordLocation>
_previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+ final ConcurrentHashMap<Object,
ConcurrentMapPartitionUpsertMetadataManager.RecordLocation>
Review Comment:
should be just `RecordLocation`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]