Jackie-Jiang commented on a change in pull request #6167:
URL: https://github.com/apache/incubator-pinot/pull/6167#discussion_r509551017



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -29,97 +30,142 @@
 
 /**
  * Manages the upsert metadata per partition.
+ * <p>For multiple records with the same timestamp, there is no guarantee on 
which record to be preserved.
+ * <p>There will be short term inconsistency when updating the upsert 
metadata, but should be consistent after the
+ * operation is done:
+ * <ul>
+ *   <li>
+ *     When updating a new record, it first removes the doc id from the 
current location, then update the new location.
+ *   </li>
+ *   <li>
+ *     When adding a new segment, it removes the doc ids from the current 
locations before the segment being added to
+ *     the RealtimeTableDataManager.
+ *   </li>
+ *   <li>
+ *     When replacing an existing segment, the updates applied to the new 
segment won't be reflected to the replaced
+ *     segment.
+ *   </li>
+ * </ul>
  */
 @ThreadSafe
 public class PartitionUpsertMetadataManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
 
   // TODO(upset): consider an off-heap KV store to persist this index to 
improve the recovery speed.
-  private final ConcurrentHashMap<PrimaryKey, RecordLocation> 
_primaryKeyToRecordLocationMap =
-      new ConcurrentHashMap<>();
-  // the mapping between the (sealed) segment and its validDocuments
-  private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> 
_segmentToValidDocIdsMap =
-      new ConcurrentHashMap<>();
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
   /**
-   * Creates the valid doc ids for the given (immutable) segment.
+   * Initializes the upsert metadata for the given immutable segment, returns 
the valid doc ids for the segment.
    */
-  public ThreadSafeMutableRoaringBitmap createValidDocIds(String segmentName) {
-    LOGGER.info("Creating valid doc ids for segment: {}", segmentName);
+  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, 
Iterator<RecordInfo> recordInfoIterator) {
+    LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+
     ThreadSafeMutableRoaringBitmap validDocIds = new 
ThreadSafeMutableRoaringBitmap();
-    if (_segmentToValidDocIdsMap.put(segmentName, validDocIds) != null) {
-      LOGGER.warn("Valid doc ids exist for segment: {}, replacing it", 
segmentName);
+    while (recordInfoIterator.hasNext()) {
+      RecordInfo recordInfo = recordInfoIterator.next();
+      _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, 
(primaryKey, currentRecordLocation) -> {
+        if (currentRecordLocation != null) {
+          // Existing primary key
+
+          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
+            // The current record location has the same segment name
+
+            if (validDocIds == currentRecordLocation.getValidDocIds()) {
+              // The current record location is pointing to the new segment 
being loaded
+
+              // Update the record location when getting a newer timestamp
+              if (recordInfo._timestamp > 
currentRecordLocation.getTimestamp()) {
+                validDocIds.remove(currentRecordLocation.getDocId());
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+              }
+            } else {
+              // The current record location is pointing to the old segment 
being replaced. This could happen when
+              // committing a consuming segment, or reloading a completed 
segment.
+
+              // Update the record location when the new timestamp is greater 
than or equal to the current timestamp.
+              // Update the record location when there is a tie because the 
record locations should point to the new
+              // segment instead of the old segment being replaced. Also, do 
not update the valid doc ids for the old
+              // segment because it has not been replaced yet.
+              if (recordInfo._timestamp >= 
currentRecordLocation.getTimestamp()) {
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+              }
+            }
+            return currentRecordLocation;
+          }
+
+          // Update the record location when getting a newer timestamp
+          if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+            
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+            validDocIds.add(recordInfo._docId);
+            return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+          } else {
+            return currentRecordLocation;
+          }
+        } else {
+          // New primary key
+          validDocIds.add(recordInfo._docId);
+          return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+        }
+      });
     }
     return validDocIds;
   }
 
   /**
-   * Returns the valid doc ids for the given (immutable) segment.
-   */
-  public ThreadSafeMutableRoaringBitmap getValidDocIds(String segmentName) {
-    return Preconditions
-        .checkNotNull(_segmentToValidDocIdsMap.get(segmentName), "Failed to 
find valid doc ids for segment: %s",
-            segmentName);
-  }
-
-  /**
-   * Updates the record location of the given primary key if the given record 
location is newer than the current record
-   * location. Also updates the valid doc ids accordingly if the record 
location is updated.
+   * Updates the upsert metadata for a new consumed record in the given 
consuming segment.
    */
-  public void updateRecordLocation(PrimaryKey primaryKey, RecordLocation 
recordLocation,
+  public synchronized void updateRecord(String segmentName, RecordInfo 
recordInfo,
       ThreadSafeMutableRoaringBitmap validDocIds) {
-    _primaryKeyToRecordLocationMap.compute(primaryKey, (k, v) -> {
-      if (v != null) {
+    _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, 
(primaryKey, currentRecordLocation) -> {
+      if (currentRecordLocation != null) {
         // Existing primary key
 
-        if (recordLocation.getTimestamp() >= v.getTimestamp()) {
-          // Update the record location
-          // NOTE: Update the record location when there is a tie on the 
timestamp because during the segment
-          //       commitment, when loading the committed segment, it should 
replace the old record locations in case
-          //       the order of records changed.
-
-          // Remove the doc from the valid doc ids of the previous location
-          if (v.isConsuming()) {
-            // Previous location is a consuming segment, whose valid doc ids 
are maintained locally. Only update the
-            // valid doc ids when the update is from the same segment.
-            if (recordLocation.isConsuming() && 
recordLocation.getSegmentName().equals(v.getSegmentName())) {
-              validDocIds.remove(v.getDocId());
-            }
-          } else {
-            ThreadSafeMutableRoaringBitmap validDocIdsForPreviousLocation =
-                _segmentToValidDocIdsMap.get(v.getSegmentName());
-            if (validDocIdsForPreviousLocation != null) {
-              validDocIdsForPreviousLocation.remove(v.getDocId());
-            } else {
-              LOGGER.warn("Failed to find valid doc ids for previous location: 
{}", v.getSegmentName());
-            }
-          }
-
-          validDocIds.add(recordLocation.getDocId());
-          return recordLocation;
+        // Update the record location when getting a newer timestamp
+        if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+          
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+          validDocIds.add(recordInfo._docId);
+          return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
         } else {
-          // No need to update
-          return v;
+          return currentRecordLocation;
         }
       } else {
         // New primary key
-        validDocIds.add(recordLocation.getDocId());
-        return recordLocation;
+        validDocIds.add(recordInfo._docId);
+        return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
       }
     });
   }
 
   /**
-   * Removes the upsert metadata for the given segment.
+   * Removes the upsert metadata for the given immutable segment. No need to 
remove the upsert metadata for the
+   * consuming segment because it should be replaced by the committed segment.
    */
-  public void removeSegment(String segmentName) {
+  public synchronized void removeSegment(String segmentName, 
ThreadSafeMutableRoaringBitmap validDocIds) {
     LOGGER.info("Removing upsert metadata for segment: {}", segmentName);
-    _primaryKeyToRecordLocationMap.forEach((k, v) -> {
-      if (v.getSegmentName().equals(segmentName)) {
-        // NOTE: Check and remove to prevent removing the key that is just 
updated.
-        _primaryKeyToRecordLocationMap.remove(k, v);
-      }
-    });
-    _segmentToValidDocIdsMap.remove(segmentName);
+
+    if (!validDocIds.getMutableRoaringBitmap().isEmpty()) {

Review comment:
       Yes, it removes the entry only when the reference of the valid doc ids 
are the same. It won't touch the upsert metadata for other segments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to