This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8678f5e  [Upsert] Preserve the newer added record when 2 records have 
the same timestamp (#6213)
8678f5e is described below

commit 8678f5ec0a5a58a095b38d27b5ecda55efdde8ec
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Oct 30 12:27:04 2020 -0700

    [Upsert] Preserve the newer added record when 2 records have the same 
timestamp (#6213)
    
    For upsert table, the record with newer timestamp will replace the old 
record with older timestamp, but when multiple records have the same timestamp, 
which record to preserve is undefined in the current implementation.
    This PR enhances the PartitionUpsertMetadataManager to preserve the latest 
ingested record if multiple records have the same timestamp:
    - If 2 records are not in the same segment, preserve the one in the segment 
with larger sequence number
    - If 2 records are in the same segment, preserve the one with larger doc id
    
    Note that for tables with sorted column, the records will be re-ordered 
when committing the segment, and we will use the re-ordered docIds instead of 
the ingestion order to decide which record to preserve.
---
 .../apache/pinot/common/utils/LLCSegmentName.java  | 14 +++-
 .../upsert/PartitionUpsertMetadataManager.java     | 54 +++++++-------
 .../upsert/PartitionUpsertMetadataManagerTest.java | 84 +++++++++++-----------
 3 files changed, 85 insertions(+), 67 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
index 29824f1..adc24ad 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.utils;
 
+import org.apache.commons.lang3.StringUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
@@ -35,11 +36,11 @@ public class LLCSegmentName extends SegmentName implements 
Comparable {
       throw new RuntimeException(segmentName + " is not a Low level consumer 
segment name");
     }
 
-    String[] parts = segmentName.split(SEPARATOR);
     _segmentName = segmentName;
+    String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
     _tableName = parts[0];
-    _partitionId = Integer.valueOf(parts[1]);
-    _sequenceNumber = Integer.valueOf(parts[2]);
+    _partitionId = Integer.parseInt(parts[1]);
+    _sequenceNumber = Integer.parseInt(parts[2]);
     _creationTime = parts[3];
   }
 
@@ -56,6 +57,13 @@ public class LLCSegmentName extends SegmentName implements 
Comparable {
     _segmentName = tableName + SEPARATOR + partitionId + SEPARATOR + 
sequenceNumber + SEPARATOR + _creationTime;
   }
 
+  /**
+   * Returns the sequence number of the given segment name.
+   */
+  public static int getSequenceNumber(String segmentName) {
+    return Integer.parseInt(StringUtils.splitByWholeSeparator(segmentName, 
SEPARATOR)[2]);
+  }
+
   @Override
   public RealtimeSegmentType getSegmentType() {
     return RealtimeSegmentType.LLC;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
index 24acbab..3912c2c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.slf4j.Logger;
@@ -32,7 +33,10 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Manages the upsert metadata per partition.
- * <p>For multiple records with the same timestamp, there is no guarantee on 
which record to be preserved.
+ * <p>For multiple records with the same timestamp, the manager will preserve 
the latest record based on the sequence
+ * number of the segment. If 2 records with the same timestamp are in the same 
segment, the one with larger doc id will
+ * be preserved. Note that for tables with sorted column, the records will be 
re-ordered when committing the segment,
+ * and we will use the re-ordered doc ids instead of the ingestion order to 
decide which record to preserve.
  * <p>There will be short term inconsistency when updating the upsert 
metadata, but should be consistent after the
  * operation is done:
  * <ul>
@@ -83,34 +87,35 @@ public class PartitionUpsertMetadataManager {
           if (segmentName.equals(currentRecordLocation.getSegmentName())) {
             // The current record location has the same segment name
 
-            if (validDocIds == currentRecordLocation.getValidDocIds()) {
-              // The current record location is pointing to the new segment 
being loaded
-
-              // Update the record location when getting a newer timestamp
-              if (recordInfo._timestamp > 
currentRecordLocation.getTimestamp()) {
+            // Update the record location when the new timestamp is greater 
than or equal to the current timestamp.
+            // There are 2 scenarios:
+            //   1. The current record location is pointing to the same 
segment (the segment being added). In this case,
+            //      we want to update the record location when there is a tie 
to keep the newer record. Note that the
+            //      record info iterator will return records with incremental 
doc ids.
+            //   2. The current record location is pointing to the old segment 
being replaced. This could happen when
+            //      committing a consuming segment, or reloading a completed 
segment. In this case, we want to update
+            //      the record location when there is a tie because the record 
locations should point to the new added
+            //      segment instead of the old segment being replaced. Also, 
do not update the valid doc ids for the old
+            //      segment because it has not been replaced yet.
+            if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) 
{
+              // Only update the valid doc ids for the new segment
+              if (validDocIds == currentRecordLocation.getValidDocIds()) {
                 validDocIds.remove(currentRecordLocation.getDocId());
-                validDocIds.add(recordInfo._docId);
-                return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
               }
+              validDocIds.add(recordInfo._docId);
+              return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
             } else {
-              // The current record location is pointing to the old segment 
being replaced. This could happen when
-              // committing a consuming segment, or reloading a completed 
segment.
-
-              // Update the record location when the new timestamp is greater 
than or equal to the current timestamp.
-              // Update the record location when there is a tie because the 
record locations should point to the new
-              // segment instead of the old segment being replaced. Also, do 
not update the valid doc ids for the old
-              // segment because it has not been replaced yet.
-              if (recordInfo._timestamp >= 
currentRecordLocation.getTimestamp()) {
-                validDocIds.add(recordInfo._docId);
-                return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
-              }
+              return currentRecordLocation;
             }
-            return currentRecordLocation;
           } else {
             // The current record location is pointing to a different segment
 
-            // Update the record location when getting a newer timestamp
-            if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+            // Update the record location when getting a newer timestamp, or 
the timestamp is the same as the current
+            // timestamp, but the segment has a larger sequence number (the 
segment is newer than the current segment).
+            if (recordInfo._timestamp > currentRecordLocation.getTimestamp() 
|| (
+                recordInfo._timestamp == currentRecordLocation.getTimestamp()
+                    && LLCSegmentName.getSequenceNumber(segmentName) > 
LLCSegmentName
+                    
.getSequenceNumber(currentRecordLocation.getSegmentName()))) {
               
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
               validDocIds.add(recordInfo._docId);
               return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
@@ -139,8 +144,9 @@ public class PartitionUpsertMetadataManager {
       if (currentRecordLocation != null) {
         // Existing primary key
 
-        // Update the record location when getting a newer timestamp
-        if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+        // Update the record location when the new timestamp is greater than 
or equal to the current timestamp. Update
+        // the record location when there is a tie to keep the newer record.
+        if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
           
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
           validDocIds.add(recordInfo._docId);
           return new RecordLocation(segmentName, recordInfo._docId, 
recordInfo._timestamp, validDocIds);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
index d1cd679..a2bde99 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
@@ -22,9 +22,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.core.upsert.PartitionUpsertMetadataManager.RecordInfo;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
@@ -35,17 +37,17 @@ import static org.testng.Assert.assertSame;
 
 
 public class PartitionUpsertMetadataManagerTest {
-  private static final String SEGMENT_PREFIX = "testSegment";
-  private static final String REALTIME_TEST_TABLE = "testTable_REALTIME";
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
 
   @Test
   public void testAddSegment() {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0, 
Mockito.mock(ServerMetrics.class));
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
Mockito.mock(ServerMetrics.class));
     Map<PrimaryKey, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
-    String segment1 = SEGMENT_PREFIX + 1;
+    String segment1 = getSegmentName(1);
     List<RecordInfo> recordInfoList1 = new ArrayList<>();
     recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 100));
@@ -55,14 +57,14 @@ public class PartitionUpsertMetadataManagerTest {
     recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 5, 100));
     ThreadSafeMutableRoaringBitmap validDocIds1 =
         upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
-    // segment1: 0 -> {0, 100}, 1 -> {4, 120}, 2 -> {2, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
     checkRecordLocation(recordLocationMap, 2, segment1, 2, 100);
-    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 4});
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4, 5});
 
     // Add the second segment
-    String segment2 = SEGMENT_PREFIX + 2;
+    String segment2 = getSegmentName(2);
     List<RecordInfo> recordInfoList2 = new ArrayList<>();
     recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList2.add(new RecordInfo(getPrimaryKey(1), 1, 100));
@@ -71,45 +73,47 @@ public class PartitionUpsertMetadataManagerTest {
     recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 4, 80));
     ThreadSafeMutableRoaringBitmap validDocIds2 =
         upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
-    // segment1: 0 -> {0, 100}, 1 -> {4, 120}
-    // segment2: 2 -> {2, 120}, 3 -> {3, 80}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    // segment1: 1 -> {4, 120}
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
     checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
-    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 4});
-    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{2, 3});
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
 
     // Replace (reload) the first segment
     ThreadSafeMutableRoaringBitmap newValidDocIds1 =
         upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
-    // original segment1: 0 -> {0, 100}, 1 -> {4, 120}
-    // segment2: 2 -> {2, 120}, 3 -> {3, 80}
-    // new segment1: 0 -> {0, 100}, 1 -> {4, 120}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    // original segment1: 1 -> {4, 120}
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // new segment1: 1 -> {4, 120}
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
     checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
-    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 4});
-    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{2, 3});
-    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 4});
-    assertSame(recordLocationMap.get(getPrimaryKey(0)).getValidDocIds(), 
newValidDocIds1);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(), 
newValidDocIds1);
 
     // Remove the original segment1
     upsertMetadataManager.removeSegment(segment1, validDocIds1);
-    // segment2: 2 -> {2, 120}, 3 -> {3, 80}
-    // new segment1: 0 -> {0, 100}, 1 -> {4, 120}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // new segment1: 1 -> {4, 120}
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
     checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
-    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{2, 3});
-    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 4});
-    assertSame(recordLocationMap.get(getPrimaryKey(0)).getValidDocIds(), 
newValidDocIds1);
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(), 
newValidDocIds1);
   }
 
+  private static String getSegmentName(int sequenceNumber) {
+    return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber, 
System.currentTimeMillis()).toString();
+  }
+
   private static PrimaryKey getPrimaryKey(int value) {
     return new PrimaryKey(new Object[]{value});
   }
@@ -126,12 +130,12 @@ public class PartitionUpsertMetadataManagerTest {
   @Test
   public void testUpdateRecord() {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0, 
Mockito.mock(ServerMetrics.class));
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
Mockito.mock(ServerMetrics.class));
     Map<PrimaryKey, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
-    String segment1 = SEGMENT_PREFIX + 1;
+    String segment1 = getSegmentName(1);
     List<RecordInfo> recordInfoList1 = new ArrayList<>();
     recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 120));
@@ -140,7 +144,7 @@ public class PartitionUpsertMetadataManagerTest {
         upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
 
     // Update records from the second segment
-    String segment2 = SEGMENT_PREFIX + 2;
+    String segment2 = getSegmentName(2);
     ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
 
     upsertMetadataManager.updateRecord(segment2, new 
RecordInfo(getPrimaryKey(3), 0, 100), validDocIds2);
@@ -174,32 +178,32 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
 
     upsertMetadataManager.updateRecord(segment2, new 
RecordInfo(getPrimaryKey(0), 3, 100), validDocIds2);
-    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
-    // segment2: 2 -> {1, 120}, 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    // segment1: 1 -> {1, 120}
+    // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment2, 3, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 1, 120);
     checkRecordLocation(recordLocationMap, 2, segment2, 1, 120);
     checkRecordLocation(recordLocationMap, 3, segment2, 0, 100);
-    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
-    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 3});
   }
 
   @Test
   public void testRemoveSegment() {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0, 
Mockito.mock(ServerMetrics.class));
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
Mockito.mock(ServerMetrics.class));
     Map<PrimaryKey, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add 2 segments
     // segment1: 0 -> {0, 100}, 1 -> {1, 100}
     // segment2: 2 -> {0, 100}, 3 -> {0, 100}
-    String segment1 = SEGMENT_PREFIX + 1;
+    String segment1 = getSegmentName(1);
     List<RecordInfo> recordInfoList1 = new ArrayList<>();
     recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 100));
     ThreadSafeMutableRoaringBitmap validDocIds1 =
         upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
-    String segment2 = SEGMENT_PREFIX + 1;
+    String segment2 = getSegmentName(2);
     List<RecordInfo> recordInfoList2 = new ArrayList<>();
     recordInfoList2.add(new RecordInfo(getPrimaryKey(2), 0, 100));
     recordInfoList2.add(new RecordInfo(getPrimaryKey(3), 1, 100));
@@ -211,8 +215,8 @@ public class PartitionUpsertMetadataManagerTest {
     // segment2: 2 -> {0, 100}, 3 -> {0, 100}
     assertNull(recordLocationMap.get(getPrimaryKey(0)));
     assertNull(recordLocationMap.get(getPrimaryKey(1)));
-    checkRecordLocation(recordLocationMap, 2, segment1, 0, 100);
-    checkRecordLocation(recordLocationMap, 3, segment1, 1, 100);
+    checkRecordLocation(recordLocationMap, 2, segment2, 0, 100);
+    checkRecordLocation(recordLocationMap, 3, segment2, 1, 100);
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to