Jackie-Jiang commented on a change in pull request #6167: URL: https://github.com/apache/incubator-pinot/pull/6167#discussion_r509551017
########## File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java ########## @@ -29,97 +30,142 @@ /** * Manages the upsert metadata per partition. + * <p>For multiple records with the same timestamp, there is no guarantee on which record to be preserved. + * <p>There will be short term inconsistency when updating the upsert metadata, but should be consistent after the + * operation is done: + * <ul> + * <li> + * When updating a new record, it first removes the doc id from the current location, then update the new location. + * </li> + * <li> + * When adding a new segment, it removes the doc ids from the current locations before the segment being added to + * the RealtimeTableDataManager. + * </li> + * <li> + * When replacing an existing segment, the updates applied to the new segment won't be reflected to the replaced + * segment. + * </li> + * </ul> */ @ThreadSafe public class PartitionUpsertMetadataManager { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class); // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed. - private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = - new ConcurrentHashMap<>(); - // the mapping between the (sealed) segment and its validDocuments - private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> _segmentToValidDocIdsMap = - new ConcurrentHashMap<>(); + @VisibleForTesting + final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>(); /** - * Creates the valid doc ids for the given (immutable) segment. + * Initializes the upsert metadata for the given immutable segment, returns the valid doc ids for the segment. */ - public ThreadSafeMutableRoaringBitmap createValidDocIds(String segmentName) { - LOGGER.info("Creating valid doc ids for segment: {}", segmentName); + public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, Iterator<RecordInfo> recordInfoIterator) { + LOGGER.info("Adding upsert metadata for segment: {}", segmentName); + ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap(); - if (_segmentToValidDocIdsMap.put(segmentName, validDocIds) != null) { - LOGGER.warn("Valid doc ids exist for segment: {}, replacing it", segmentName); + while (recordInfoIterator.hasNext()) { + RecordInfo recordInfo = recordInfoIterator.next(); + _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> { + if (currentRecordLocation != null) { + // Existing primary key + + if (segmentName.equals(currentRecordLocation.getSegmentName())) { + // The current record location has the same segment name + + if (validDocIds == currentRecordLocation.getValidDocIds()) { + // The current record location is pointing to the new segment being loaded + + // Update the record location when getting a newer timestamp + if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) { + validDocIds.remove(currentRecordLocation.getDocId()); + validDocIds.add(recordInfo._docId); + return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds); + } + } else { + // The current record location is pointing to the old segment being replaced. This could happen when + // committing a consuming segment, or reloading a completed segment. + + // Update the record location when the new timestamp is greater than or equal to the current timestamp. + // Update the record location when there is a tie because the record locations should point to the new + // segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old + // segment because it has not been replaced yet. + if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) { + validDocIds.add(recordInfo._docId); + return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds); + } + } + return currentRecordLocation; + } + + // Update the record location when getting a newer timestamp + if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) { + currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId()); + validDocIds.add(recordInfo._docId); + return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds); + } else { + return currentRecordLocation; + } + } else { + // New primary key + validDocIds.add(recordInfo._docId); + return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds); + } + }); } return validDocIds; } /** - * Returns the valid doc ids for the given (immutable) segment. - */ - public ThreadSafeMutableRoaringBitmap getValidDocIds(String segmentName) { - return Preconditions - .checkNotNull(_segmentToValidDocIdsMap.get(segmentName), "Failed to find valid doc ids for segment: %s", - segmentName); - } - - /** - * Updates the record location of the given primary key if the given record location is newer than the current record - * location. Also updates the valid doc ids accordingly if the record location is updated. + * Updates the upsert metadata for a new consumed record in the given consuming segment. */ - public void updateRecordLocation(PrimaryKey primaryKey, RecordLocation recordLocation, + public synchronized void updateRecord(String segmentName, RecordInfo recordInfo, ThreadSafeMutableRoaringBitmap validDocIds) { - _primaryKeyToRecordLocationMap.compute(primaryKey, (k, v) -> { - if (v != null) { + _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> { + if (currentRecordLocation != null) { // Existing primary key - if (recordLocation.getTimestamp() >= v.getTimestamp()) { - // Update the record location - // NOTE: Update the record location when there is a tie on the timestamp because during the segment - // commitment, when loading the committed segment, it should replace the old record locations in case - // the order of records changed. - - // Remove the doc from the valid doc ids of the previous location - if (v.isConsuming()) { - // Previous location is a consuming segment, whose valid doc ids are maintained locally. Only update the - // valid doc ids when the update is from the same segment. - if (recordLocation.isConsuming() && recordLocation.getSegmentName().equals(v.getSegmentName())) { - validDocIds.remove(v.getDocId()); - } - } else { - ThreadSafeMutableRoaringBitmap validDocIdsForPreviousLocation = - _segmentToValidDocIdsMap.get(v.getSegmentName()); - if (validDocIdsForPreviousLocation != null) { - validDocIdsForPreviousLocation.remove(v.getDocId()); - } else { - LOGGER.warn("Failed to find valid doc ids for previous location: {}", v.getSegmentName()); - } - } - - validDocIds.add(recordLocation.getDocId()); - return recordLocation; + // Update the record location when getting a newer timestamp + if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) { + currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId()); + validDocIds.add(recordInfo._docId); + return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds); } else { - // No need to update - return v; + return currentRecordLocation; } } else { // New primary key - validDocIds.add(recordLocation.getDocId()); - return recordLocation; + validDocIds.add(recordInfo._docId); + return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds); } }); } /** - * Removes the upsert metadata for the given segment. + * Removes the upsert metadata for the given immutable segment. No need to remove the upsert metadata for the + * consuming segment because it should be replaced by the committed segment. */ - public void removeSegment(String segmentName) { + public synchronized void removeSegment(String segmentName, ThreadSafeMutableRoaringBitmap validDocIds) { LOGGER.info("Removing upsert metadata for segment: {}", segmentName); - _primaryKeyToRecordLocationMap.forEach((k, v) -> { - if (v.getSegmentName().equals(segmentName)) { - // NOTE: Check and remove to prevent removing the key that is just updated. - _primaryKeyToRecordLocationMap.remove(k, v); - } - }); - _segmentToValidDocIdsMap.remove(segmentName); + + if (!validDocIds.getMutableRoaringBitmap().isEmpty()) { Review comment: Yes, it removes the entry only when the reference of the valid doc ids are the same. It won't touch the upsert metadata for other segments. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org