yupeng9 commented on a change in pull request #6167:
URL: https://github.com/apache/incubator-pinot/pull/6167#discussion_r508991010



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -29,97 +30,142 @@
 
 /**
  * Manages the upsert metadata per partition.
+ * <p>For multiple records with the same timestamp, there is no guarantee on 
which record to be preserved.
+ * <p>There will be short term inconsistency when updating the upsert 
metadata, but should be consistent after the
+ * operation is done:
+ * <ul>
+ *   <li>
+ *     When updating a new record, it first removes the doc id from the 
current location, then update the new location.
+ *   </li>
+ *   <li>
+ *     When adding a new segment, it removes the doc ids from the current 
locations before the segment being added to
+ *     the RealtimeTableDataManager.
+ *   </li>
+ *   <li>
+ *     When replacing an existing segment, the updates applied to the new 
segment won't be reflected to the replaced
+ *     segment.
+ *   </li>
+ * </ul>
  */
 @ThreadSafe
 public class PartitionUpsertMetadataManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
 
   // TODO(upset): consider an off-heap KV store to persist this index to 
improve the recovery speed.
-  private final ConcurrentHashMap<PrimaryKey, RecordLocation> 
_primaryKeyToRecordLocationMap =
-      new ConcurrentHashMap<>();
-  // the mapping between the (sealed) segment and its validDocuments
-  private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> 
_segmentToValidDocIdsMap =
-      new ConcurrentHashMap<>();
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
   /**
-   * Creates the valid doc ids for the given (immutable) segment.
+   * Initializes the upsert metadata for the given immutable segment, returns 
the valid doc ids for the segment.
    */
-  public ThreadSafeMutableRoaringBitmap createValidDocIds(String segmentName) {
-    LOGGER.info("Creating valid doc ids for segment: {}", segmentName);
+  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, 
Iterator<RecordInfo> recordInfoIterator) {
+    LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+
     ThreadSafeMutableRoaringBitmap validDocIds = new 
ThreadSafeMutableRoaringBitmap();
-    if (_segmentToValidDocIdsMap.put(segmentName, validDocIds) != null) {
-      LOGGER.warn("Valid doc ids exist for segment: {}, replacing it", 
segmentName);
+    while (recordInfoIterator.hasNext()) {
+      RecordInfo recordInfo = recordInfoIterator.next();
+      _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, 
(primaryKey, currentRecordLocation) -> {
+        if (currentRecordLocation != null) {
+          // Existing primary key
+
+          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
+            // The current record location has the same segment name
+
+            if (validDocIds == currentRecordLocation.getValidDocIds()) {
+              // The current record location is pointing to the new segment 
being loaded
+
+              // Update the record location when getting a newer timestamp
+              if (recordInfo._timestamp > 
currentRecordLocation.getTimestamp()) {
+                validDocIds.remove(currentRecordLocation.getDocId());
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+              }
+            } else {
+              // The current record location is pointing to the old segment 
being replaced. This could happen when
+              // committing a consuming segment, or reloading a completed 
segment.
+
+              // Update the record location when the new timestamp is greater 
than or equal to the current timestamp.
+              // Update the record location when there is a tie because the 
record locations should point to the new
+              // segment instead of the old segment being replaced. Also, do 
not update the valid doc ids for the old
+              // segment because it has not been replaced yet.

Review comment:
       Not sure if this old segment check is necessary:
    - since the old segment will be replaced, it shall be safe to update the 
valid doc, since it will be gone anyway?
    - if so, then the handling is identical to the branch above, and therefore 
can be merged?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -29,97 +30,142 @@
 
 /**
  * Manages the upsert metadata per partition.
+ * <p>For multiple records with the same timestamp, there is no guarantee on 
which record to be preserved.
+ * <p>There will be short term inconsistency when updating the upsert 
metadata, but should be consistent after the
+ * operation is done:
+ * <ul>
+ *   <li>
+ *     When updating a new record, it first removes the doc id from the 
current location, then update the new location.
+ *   </li>
+ *   <li>
+ *     When adding a new segment, it removes the doc ids from the current 
locations before the segment being added to
+ *     the RealtimeTableDataManager.
+ *   </li>
+ *   <li>
+ *     When replacing an existing segment, the updates applied to the new 
segment won't be reflected to the replaced
+ *     segment.
+ *   </li>
+ * </ul>
  */
 @ThreadSafe
 public class PartitionUpsertMetadataManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
 
   // TODO(upset): consider an off-heap KV store to persist this index to 
improve the recovery speed.
-  private final ConcurrentHashMap<PrimaryKey, RecordLocation> 
_primaryKeyToRecordLocationMap =
-      new ConcurrentHashMap<>();
-  // the mapping between the (sealed) segment and its validDocuments
-  private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> 
_segmentToValidDocIdsMap =
-      new ConcurrentHashMap<>();
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
   /**
-   * Creates the valid doc ids for the given (immutable) segment.
+   * Initializes the upsert metadata for the given immutable segment, returns 
the valid doc ids for the segment.
    */
-  public ThreadSafeMutableRoaringBitmap createValidDocIds(String segmentName) {
-    LOGGER.info("Creating valid doc ids for segment: {}", segmentName);
+  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, 
Iterator<RecordInfo> recordInfoIterator) {
+    LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+
     ThreadSafeMutableRoaringBitmap validDocIds = new 
ThreadSafeMutableRoaringBitmap();
-    if (_segmentToValidDocIdsMap.put(segmentName, validDocIds) != null) {
-      LOGGER.warn("Valid doc ids exist for segment: {}, replacing it", 
segmentName);
+    while (recordInfoIterator.hasNext()) {
+      RecordInfo recordInfo = recordInfoIterator.next();
+      _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, 
(primaryKey, currentRecordLocation) -> {
+        if (currentRecordLocation != null) {
+          // Existing primary key
+
+          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
+            // The current record location has the same segment name
+
+            if (validDocIds == currentRecordLocation.getValidDocIds()) {
+              // The current record location is pointing to the new segment 
being loaded
+
+              // Update the record location when getting a newer timestamp
+              if (recordInfo._timestamp > 
currentRecordLocation.getTimestamp()) {
+                validDocIds.remove(currentRecordLocation.getDocId());
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+              }
+            } else {
+              // The current record location is pointing to the old segment 
being replaced. This could happen when
+              // committing a consuming segment, or reloading a completed 
segment.
+
+              // Update the record location when the new timestamp is greater 
than or equal to the current timestamp.
+              // Update the record location when there is a tie because the 
record locations should point to the new
+              // segment instead of the old segment being replaced. Also, do 
not update the valid doc ids for the old
+              // segment because it has not been replaced yet.
+              if (recordInfo._timestamp >= 
currentRecordLocation.getTimestamp()) {
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+              }
+            }
+            return currentRecordLocation;
+          }
+
+          // Update the record location when getting a newer timestamp

Review comment:
       wrap this in the else branch for better readability.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -29,97 +30,142 @@
 
 /**
  * Manages the upsert metadata per partition.
+ * <p>For multiple records with the same timestamp, there is no guarantee on 
which record to be preserved.
+ * <p>There will be short term inconsistency when updating the upsert 
metadata, but should be consistent after the
+ * operation is done:
+ * <ul>
+ *   <li>
+ *     When updating a new record, it first removes the doc id from the 
current location, then update the new location.
+ *   </li>
+ *   <li>
+ *     When adding a new segment, it removes the doc ids from the current 
locations before the segment being added to
+ *     the RealtimeTableDataManager.
+ *   </li>
+ *   <li>
+ *     When replacing an existing segment, the updates applied to the new 
segment won't be reflected to the replaced

Review comment:
       it's worth explaining this a bit on which data structures won't be 
reflected.

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
##########
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.upsert;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.core.upsert.PartitionUpsertMetadataManager.RecordInfo;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
+
+
+public class PartitionUpsertMetadataManagerTest {
+  private static final String SEGMENT_PREFIX = "testSegment";
+
+  @Test
+  public void testAddSegment() {
+    PartitionUpsertMetadataManager upsertMetadataManager = new 
PartitionUpsertMetadataManager();
+    Map<PrimaryKey, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    // Add the first segment
+    String segment1 = SEGMENT_PREFIX + 1;
+    List<RecordInfo> recordInfoList1 = new ArrayList<>();
+    recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
+    recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 100));
+    recordInfoList1.add(new RecordInfo(getPrimaryKey(2), 2, 100));
+    recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 3, 80));
+    recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 4, 120));
+    recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 5, 100));
+    ThreadSafeMutableRoaringBitmap validDocIds1 =
+        upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
+    // segment1: 0 -> {0, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 4});
+
+    // Add the second segment
+    String segment2 = SEGMENT_PREFIX + 2;
+    List<RecordInfo> recordInfoList2 = new ArrayList<>();
+    recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 0, 100));
+    recordInfoList2.add(new RecordInfo(getPrimaryKey(1), 1, 100));
+    recordInfoList2.add(new RecordInfo(getPrimaryKey(2), 2, 120));
+    recordInfoList2.add(new RecordInfo(getPrimaryKey(3), 3, 80));
+    recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 4, 80));
+    ThreadSafeMutableRoaringBitmap validDocIds2 =
+        upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
+    // segment1: 0 -> {0, 100}, 1 -> {4, 120}
+    // segment2: 2 -> {2, 120}, 3 -> {3, 80}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{2, 3});
+
+    // Replace (reload) the first segment
+    ThreadSafeMutableRoaringBitmap newValidDocIds1 =
+        upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
+    // original segment1: 0 -> {0, 100}, 1 -> {4, 120}

Review comment:
       shall we include the removal as part of the replace? the removal of the 
old shall be after the addition of the new?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/upsert/RecordLocation.java
##########
@@ -28,13 +28,13 @@
   private final String _segmentName;
   private final int _docId;
   private final long _timestamp;
-  private final boolean _isConsuming;
+  private final ThreadSafeMutableRoaringBitmap _validDocIds;
 
-  public RecordLocation(String segmentName, int docId, long timestamp, boolean 
isConsuming) {
+  public RecordLocation(String segmentName, int docId, long timestamp, 
ThreadSafeMutableRoaringBitmap validDocIds) {

Review comment:
       not sure if `ThreadSafeMutableRoaringBitmap ` is the best identifier of 
the containing segment. Perhaps the segmentImpl itself, in case 
`ThreadSafeMutableRoaringBitmap ` itself may be replaced?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentImpl.java
##########
@@ -50,25 +51,29 @@
   private final SegmentMetadataImpl _segmentMetadata;
   private final Map<String, ColumnIndexContainer> _indexContainerMap;
   private final StarTreeIndexContainer _starTreeIndexContainer;
-  private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
-  private final ValidDocIndexReader _validDocIndex;
+
+  // For upsert
+  private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+  private ThreadSafeMutableRoaringBitmap _validDocIds;
+  private ValidDocIndexReader _validDocIndex;
 
   public ImmutableSegmentImpl(SegmentDirectory segmentDirectory, 
SegmentMetadataImpl segmentMetadata,
       Map<String, ColumnIndexContainer> columnIndexContainerMap,
-      @Nullable StarTreeIndexContainer starTreeIndexContainer,
-      @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) 
{
+      @Nullable StarTreeIndexContainer starTreeIndexContainer) {
     _segmentDirectory = segmentDirectory;
     _segmentMetadata = segmentMetadata;
     _indexContainerMap = columnIndexContainerMap;
     _starTreeIndexContainer = starTreeIndexContainer;
-    if (partitionUpsertMetadataManager != null) {
-      _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
-      _validDocIndex =
-          new 
ValidDocIndexReaderImpl(partitionUpsertMetadataManager.createValidDocIds(getSegmentName()));
-    } else {
-      _partitionUpsertMetadataManager = null;
-      _validDocIndex = null;
-    }
+  }
+
+  /**
+   * Enables upsert for this segment.
+   */
+  public void enableUpsert(PartitionUpsertMetadataManager 
partitionUpsertMetadataManager,
+      ThreadSafeMutableRoaringBitmap validDocIds) {
+    _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
+    _validDocIds = validDocIds;
+    _validDocIndex = new ValidDocIndexReaderImpl(validDocIds);

Review comment:
       Is it possible that this immutable segment is queried before the 
`enableUpsert ` is invoked?
   
   If so, `_validDocIndex ` will be null and confuse the query plan

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -29,97 +30,142 @@
 
 /**
  * Manages the upsert metadata per partition.
+ * <p>For multiple records with the same timestamp, there is no guarantee on 
which record to be preserved.
+ * <p>There will be short term inconsistency when updating the upsert 
metadata, but should be consistent after the
+ * operation is done:
+ * <ul>
+ *   <li>
+ *     When updating a new record, it first removes the doc id from the 
current location, then update the new location.
+ *   </li>
+ *   <li>
+ *     When adding a new segment, it removes the doc ids from the current 
locations before the segment being added to
+ *     the RealtimeTableDataManager.
+ *   </li>
+ *   <li>
+ *     When replacing an existing segment, the updates applied to the new 
segment won't be reflected to the replaced
+ *     segment.
+ *   </li>
+ * </ul>
  */
 @ThreadSafe
 public class PartitionUpsertMetadataManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
 
   // TODO(upset): consider an off-heap KV store to persist this index to 
improve the recovery speed.
-  private final ConcurrentHashMap<PrimaryKey, RecordLocation> 
_primaryKeyToRecordLocationMap =
-      new ConcurrentHashMap<>();
-  // the mapping between the (sealed) segment and its validDocuments
-  private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> 
_segmentToValidDocIdsMap =
-      new ConcurrentHashMap<>();
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
   /**
-   * Creates the valid doc ids for the given (immutable) segment.
+   * Initializes the upsert metadata for the given immutable segment, returns 
the valid doc ids for the segment.
    */
-  public ThreadSafeMutableRoaringBitmap createValidDocIds(String segmentName) {
-    LOGGER.info("Creating valid doc ids for segment: {}", segmentName);
+  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, 
Iterator<RecordInfo> recordInfoIterator) {
+    LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+
     ThreadSafeMutableRoaringBitmap validDocIds = new 
ThreadSafeMutableRoaringBitmap();
-    if (_segmentToValidDocIdsMap.put(segmentName, validDocIds) != null) {
-      LOGGER.warn("Valid doc ids exist for segment: {}, replacing it", 
segmentName);
+    while (recordInfoIterator.hasNext()) {
+      RecordInfo recordInfo = recordInfoIterator.next();
+      _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, 
(primaryKey, currentRecordLocation) -> {
+        if (currentRecordLocation != null) {
+          // Existing primary key
+
+          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
+            // The current record location has the same segment name
+
+            if (validDocIds == currentRecordLocation.getValidDocIds()) {
+              // The current record location is pointing to the new segment 
being loaded
+
+              // Update the record location when getting a newer timestamp
+              if (recordInfo._timestamp > 
currentRecordLocation.getTimestamp()) {
+                validDocIds.remove(currentRecordLocation.getDocId());
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+              }
+            } else {
+              // The current record location is pointing to the old segment 
being replaced. This could happen when
+              // committing a consuming segment, or reloading a completed 
segment.
+
+              // Update the record location when the new timestamp is greater 
than or equal to the current timestamp.
+              // Update the record location when there is a tie because the 
record locations should point to the new
+              // segment instead of the old segment being replaced. Also, do 
not update the valid doc ids for the old
+              // segment because it has not been replaced yet.
+              if (recordInfo._timestamp >= 
currentRecordLocation.getTimestamp()) {
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+              }
+            }
+            return currentRecordLocation;
+          }
+
+          // Update the record location when getting a newer timestamp
+          if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+            
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+            validDocIds.add(recordInfo._docId);
+            return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+          } else {
+            return currentRecordLocation;
+          }
+        } else {
+          // New primary key
+          validDocIds.add(recordInfo._docId);
+          return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+        }
+      });
     }
     return validDocIds;
   }
 
   /**
-   * Returns the valid doc ids for the given (immutable) segment.
-   */
-  public ThreadSafeMutableRoaringBitmap getValidDocIds(String segmentName) {
-    return Preconditions
-        .checkNotNull(_segmentToValidDocIdsMap.get(segmentName), "Failed to 
find valid doc ids for segment: %s",
-            segmentName);
-  }
-
-  /**
-   * Updates the record location of the given primary key if the given record 
location is newer than the current record
-   * location. Also updates the valid doc ids accordingly if the record 
location is updated.
+   * Updates the upsert metadata for a new consumed record in the given 
consuming segment.
    */
-  public void updateRecordLocation(PrimaryKey primaryKey, RecordLocation 
recordLocation,
+  public synchronized void updateRecord(String segmentName, RecordInfo 
recordInfo,
       ThreadSafeMutableRoaringBitmap validDocIds) {
-    _primaryKeyToRecordLocationMap.compute(primaryKey, (k, v) -> {
-      if (v != null) {
+    _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, 
(primaryKey, currentRecordLocation) -> {
+      if (currentRecordLocation != null) {
         // Existing primary key
 
-        if (recordLocation.getTimestamp() >= v.getTimestamp()) {
-          // Update the record location
-          // NOTE: Update the record location when there is a tie on the 
timestamp because during the segment
-          //       commitment, when loading the committed segment, it should 
replace the old record locations in case
-          //       the order of records changed.
-
-          // Remove the doc from the valid doc ids of the previous location
-          if (v.isConsuming()) {
-            // Previous location is a consuming segment, whose valid doc ids 
are maintained locally. Only update the
-            // valid doc ids when the update is from the same segment.
-            if (recordLocation.isConsuming() && 
recordLocation.getSegmentName().equals(v.getSegmentName())) {
-              validDocIds.remove(v.getDocId());
-            }
-          } else {
-            ThreadSafeMutableRoaringBitmap validDocIdsForPreviousLocation =
-                _segmentToValidDocIdsMap.get(v.getSegmentName());
-            if (validDocIdsForPreviousLocation != null) {
-              validDocIdsForPreviousLocation.remove(v.getDocId());
-            } else {
-              LOGGER.warn("Failed to find valid doc ids for previous location: 
{}", v.getSegmentName());
-            }
-          }
-
-          validDocIds.add(recordLocation.getDocId());
-          return recordLocation;
+        // Update the record location when getting a newer timestamp
+        if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+          
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+          validDocIds.add(recordInfo._docId);
+          return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
         } else {
-          // No need to update
-          return v;
+          return currentRecordLocation;
         }
       } else {
         // New primary key
-        validDocIds.add(recordLocation.getDocId());
-        return recordLocation;
+        validDocIds.add(recordInfo._docId);
+        return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
       }
     });
   }
 
   /**
-   * Removes the upsert metadata for the given segment.
+   * Removes the upsert metadata for the given immutable segment. No need to 
remove the upsert metadata for the
+   * consuming segment because it should be replaced by the committed segment.
    */
-  public void removeSegment(String segmentName) {
+  public synchronized void removeSegment(String segmentName, 
ThreadSafeMutableRoaringBitmap validDocIds) {
     LOGGER.info("Removing upsert metadata for segment: {}", segmentName);
-    _primaryKeyToRecordLocationMap.forEach((k, v) -> {
-      if (v.getSegmentName().equals(segmentName)) {
-        // NOTE: Check and remove to prevent removing the key that is just 
updated.
-        _primaryKeyToRecordLocationMap.remove(k, v);
-      }
-    });
-    _segmentToValidDocIdsMap.remove(segmentName);
+
+    if (!validDocIds.getMutableRoaringBitmap().isEmpty()) {

Review comment:
       does this check the case that a replaced segment shall not remove the 
keys of the newly loaded? Perhaps we shall consider a state of tracking the 
current segmentImpl (and its corresponding `validDocIds`) for a segment name?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -29,97 +30,142 @@
 
 /**
  * Manages the upsert metadata per partition.
+ * <p>For multiple records with the same timestamp, there is no guarantee on 
which record to be preserved.
+ * <p>There will be short term inconsistency when updating the upsert 
metadata, but should be consistent after the
+ * operation is done:
+ * <ul>
+ *   <li>
+ *     When updating a new record, it first removes the doc id from the 
current location, then update the new location.
+ *   </li>
+ *   <li>
+ *     When adding a new segment, it removes the doc ids from the current 
locations before the segment being added to
+ *     the RealtimeTableDataManager.
+ *   </li>
+ *   <li>
+ *     When replacing an existing segment, the updates applied to the new 
segment won't be reflected to the replaced
+ *     segment.
+ *   </li>
+ * </ul>
  */
 @ThreadSafe
 public class PartitionUpsertMetadataManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
 
   // TODO(upset): consider an off-heap KV store to persist this index to 
improve the recovery speed.
-  private final ConcurrentHashMap<PrimaryKey, RecordLocation> 
_primaryKeyToRecordLocationMap =
-      new ConcurrentHashMap<>();
-  // the mapping between the (sealed) segment and its validDocuments
-  private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> 
_segmentToValidDocIdsMap =
-      new ConcurrentHashMap<>();
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
   /**
-   * Creates the valid doc ids for the given (immutable) segment.
+   * Initializes the upsert metadata for the given immutable segment, returns 
the valid doc ids for the segment.
    */
-  public ThreadSafeMutableRoaringBitmap createValidDocIds(String segmentName) {
-    LOGGER.info("Creating valid doc ids for segment: {}", segmentName);
+  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, 
Iterator<RecordInfo> recordInfoIterator) {
+    LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+
     ThreadSafeMutableRoaringBitmap validDocIds = new 
ThreadSafeMutableRoaringBitmap();
-    if (_segmentToValidDocIdsMap.put(segmentName, validDocIds) != null) {
-      LOGGER.warn("Valid doc ids exist for segment: {}, replacing it", 
segmentName);
+    while (recordInfoIterator.hasNext()) {
+      RecordInfo recordInfo = recordInfoIterator.next();
+      _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, 
(primaryKey, currentRecordLocation) -> {
+        if (currentRecordLocation != null) {
+          // Existing primary key
+
+          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
+            // The current record location has the same segment name
+
+            if (validDocIds == currentRecordLocation.getValidDocIds()) {
+              // The current record location is pointing to the new segment 
being loaded
+
+              // Update the record location when getting a newer timestamp
+              if (recordInfo._timestamp > 
currentRecordLocation.getTimestamp()) {
+                validDocIds.remove(currentRecordLocation.getDocId());
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+              }
+            } else {
+              // The current record location is pointing to the old segment 
being replaced. This could happen when
+              // committing a consuming segment, or reloading a completed 
segment.
+
+              // Update the record location when the new timestamp is greater 
than or equal to the current timestamp.
+              // Update the record location when there is a tie because the 
record locations should point to the new
+              // segment instead of the old segment being replaced. Also, do 
not update the valid doc ids for the old
+              // segment because it has not been replaced yet.
+              if (recordInfo._timestamp >= 
currentRecordLocation.getTimestamp()) {
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+              }
+            }
+            return currentRecordLocation;
+          }
+
+          // Update the record location when getting a newer timestamp
+          if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+            
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+            validDocIds.add(recordInfo._docId);
+            return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+          } else {
+            return currentRecordLocation;
+          }
+        } else {
+          // New primary key
+          validDocIds.add(recordInfo._docId);
+          return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
+        }
+      });
     }
     return validDocIds;
   }
 
   /**
-   * Returns the valid doc ids for the given (immutable) segment.
-   */
-  public ThreadSafeMutableRoaringBitmap getValidDocIds(String segmentName) {
-    return Preconditions
-        .checkNotNull(_segmentToValidDocIdsMap.get(segmentName), "Failed to 
find valid doc ids for segment: %s",
-            segmentName);
-  }
-
-  /**
-   * Updates the record location of the given primary key if the given record 
location is newer than the current record
-   * location. Also updates the valid doc ids accordingly if the record 
location is updated.
+   * Updates the upsert metadata for a new consumed record in the given 
consuming segment.
    */
-  public void updateRecordLocation(PrimaryKey primaryKey, RecordLocation 
recordLocation,
+  public synchronized void updateRecord(String segmentName, RecordInfo 
recordInfo,
       ThreadSafeMutableRoaringBitmap validDocIds) {
-    _primaryKeyToRecordLocationMap.compute(primaryKey, (k, v) -> {
-      if (v != null) {
+    _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, 
(primaryKey, currentRecordLocation) -> {
+      if (currentRecordLocation != null) {
         // Existing primary key
 
-        if (recordLocation.getTimestamp() >= v.getTimestamp()) {
-          // Update the record location
-          // NOTE: Update the record location when there is a tie on the 
timestamp because during the segment
-          //       commitment, when loading the committed segment, it should 
replace the old record locations in case
-          //       the order of records changed.
-
-          // Remove the doc from the valid doc ids of the previous location
-          if (v.isConsuming()) {
-            // Previous location is a consuming segment, whose valid doc ids 
are maintained locally. Only update the
-            // valid doc ids when the update is from the same segment.
-            if (recordLocation.isConsuming() && 
recordLocation.getSegmentName().equals(v.getSegmentName())) {
-              validDocIds.remove(v.getDocId());
-            }
-          } else {
-            ThreadSafeMutableRoaringBitmap validDocIdsForPreviousLocation =
-                _segmentToValidDocIdsMap.get(v.getSegmentName());
-            if (validDocIdsForPreviousLocation != null) {
-              validDocIdsForPreviousLocation.remove(v.getDocId());
-            } else {
-              LOGGER.warn("Failed to find valid doc ids for previous location: 
{}", v.getSegmentName());
-            }
-          }
-
-          validDocIds.add(recordLocation.getDocId());
-          return recordLocation;
+        // Update the record location when getting a newer timestamp
+        if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+          
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+          validDocIds.add(recordInfo._docId);
+          return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
         } else {
-          // No need to update
-          return v;
+          return currentRecordLocation;
         }
       } else {
         // New primary key
-        validDocIds.add(recordLocation.getDocId());
-        return recordLocation;
+        validDocIds.add(recordInfo._docId);
+        return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
       }
     });
   }
 
   /**
-   * Removes the upsert metadata for the given segment.
+   * Removes the upsert metadata for the given immutable segment. No need to 
remove the upsert metadata for the
+   * consuming segment because it should be replaced by the committed segment.

Review comment:
       how is consuming segment related?

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
##########
@@ -117,8 +119,9 @@ public void buildSegment()
   public void loadSegment()
       throws Exception {
     _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, 
SEGMENT_NAME), ReadMode.heap);
-    _upsertIndexSegment = ImmutableSegmentLoader
-        .load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap, new 
PartitionUpsertMetadataManager());
+    _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, 
SEGMENT_NAME), ReadMode.heap);
+    ((ImmutableSegmentImpl) _upsertIndexSegment)
+        .enableUpsert(new PartitionUpsertMetadataManager(), new 
ThreadSafeMutableRoaringBitmap());

Review comment:
       nit: I think it's preferred to enable it as early as possible (i.e in 
the constructor), we know this segment will be an upsert one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to