This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch upsert_test in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit ac9d7f6f639118acd8b3322b9780912999fc03bb Author: Xiaotian (Jackie) Jiang <[email protected]> AuthorDate: Thu Oct 29 19:25:16 2020 -0700 [Upsert] Preserve the newer added record when 2 records have the same timestamp --- .../apache/pinot/common/utils/LLCSegmentName.java | 14 +++- .../upsert/PartitionUpsertMetadataManager.java | 54 +++++++------- .../upsert/PartitionUpsertMetadataManagerTest.java | 84 +++++++++++----------- 3 files changed, 85 insertions(+), 67 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java index 29824f1..adc24ad 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.common.utils; +import org.apache.commons.lang3.StringUtils; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -35,11 +36,11 @@ public class LLCSegmentName extends SegmentName implements Comparable { throw new RuntimeException(segmentName + " is not a Low level consumer segment name"); } - String[] parts = segmentName.split(SEPARATOR); _segmentName = segmentName; + String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR); _tableName = parts[0]; - _partitionId = Integer.valueOf(parts[1]); - _sequenceNumber = Integer.valueOf(parts[2]); + _partitionId = Integer.parseInt(parts[1]); + _sequenceNumber = Integer.parseInt(parts[2]); _creationTime = parts[3]; } @@ -56,6 +57,13 @@ public class LLCSegmentName extends SegmentName implements Comparable { _segmentName = tableName + SEPARATOR + partitionId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime; } + /** + * Returns the sequence number of the given segment name. + */ + public static int getSequenceNumber(String segmentName) { + return Integer.parseInt(StringUtils.splitByWholeSeparator(segmentName, SEPARATOR)[2]); + } + @Override public RealtimeSegmentType getSegmentType() { return RealtimeSegmentType.LLC; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java index 24acbab..3912c2c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap; import org.apache.pinot.spi.data.readers.PrimaryKey; import org.slf4j.Logger; @@ -32,7 +33,10 @@ import org.slf4j.LoggerFactory; /** * Manages the upsert metadata per partition. - * <p>For multiple records with the same timestamp, there is no guarantee on which record to be preserved. + * <p>For multiple records with the same timestamp, the manager will preserve the latest record based on the sequence + * number of the segment. If 2 records with the same timestamp are in the same segment, the one with larger doc id will + * be preserved. Note that for tables with sorted column, the records will be re-ordered when committing the segment, + * and we will use the re-ordered doc ids instead of the ingestion order to decide which record to preserve. * <p>There will be short term inconsistency when updating the upsert metadata, but should be consistent after the * operation is done: * <ul> @@ -83,34 +87,35 @@ public class PartitionUpsertMetadataManager { if (segmentName.equals(currentRecordLocation.getSegmentName())) { // The current record location has the same segment name - if (validDocIds == currentRecordLocation.getValidDocIds()) { - // The current record location is pointing to the new segment being loaded - - // Update the record location when getting a newer timestamp - if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) { + // Update the record location when the new timestamp is greater than or equal to the current timestamp. + // There are 2 scenarios: + // 1. The current record location is pointing to the same segment (the segment being added). In this case, + // we want to update the record location when there is a tie to keep the newer record. Note that the + // record info iterator will return records with incremental doc ids. + // 2. The current record location is pointing to the old segment being replaced. This could happen when + // committing a consuming segment, or reloading a completed segment. In this case, we want to update + // the record location when there is a tie because the record locations should point to the new added + // segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old + // segment because it has not been replaced yet. + if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) { + // Only update the valid doc ids for the new segment + if (validDocIds == currentRecordLocation.getValidDocIds()) { validDocIds.remove(currentRecordLocation.getDocId()); - validDocIds.add(recordInfo._docId); - return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds); } + validDocIds.add(recordInfo._docId); + return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds); } else { - // The current record location is pointing to the old segment being replaced. This could happen when - // committing a consuming segment, or reloading a completed segment. - - // Update the record location when the new timestamp is greater than or equal to the current timestamp. - // Update the record location when there is a tie because the record locations should point to the new - // segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old - // segment because it has not been replaced yet. - if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) { - validDocIds.add(recordInfo._docId); - return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds); - } + return currentRecordLocation; } - return currentRecordLocation; } else { // The current record location is pointing to a different segment - // Update the record location when getting a newer timestamp - if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) { + // Update the record location when getting a newer timestamp, or the timestamp is the same as the current + // timestamp, but the segment has a larger sequence number (the segment is newer than the current segment). + if (recordInfo._timestamp > currentRecordLocation.getTimestamp() || ( + recordInfo._timestamp == currentRecordLocation.getTimestamp() + && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName + .getSequenceNumber(currentRecordLocation.getSegmentName()))) { currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId()); validDocIds.add(recordInfo._docId); return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds); @@ -139,8 +144,9 @@ public class PartitionUpsertMetadataManager { if (currentRecordLocation != null) { // Existing primary key - // Update the record location when getting a newer timestamp - if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) { + // Update the record location when the new timestamp is greater than or equal to the current timestamp. Update + // the record location when there is a tie to keep the newer record. + if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) { currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId()); validDocIds.add(recordInfo._docId); return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java index d1cd679..a2bde99 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java @@ -22,9 +22,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap; import org.apache.pinot.core.upsert.PartitionUpsertMetadataManager.RecordInfo; import org.apache.pinot.spi.data.readers.PrimaryKey; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -35,17 +37,17 @@ import static org.testng.Assert.assertSame; public class PartitionUpsertMetadataManagerTest { - private static final String SEGMENT_PREFIX = "testSegment"; - private static final String REALTIME_TEST_TABLE = "testTable_REALTIME"; + private static final String RAW_TABLE_NAME = "testTable"; + private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); @Test public void testAddSegment() { PartitionUpsertMetadataManager upsertMetadataManager = - new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0, Mockito.mock(ServerMetrics.class)); + new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Mockito.mock(ServerMetrics.class)); Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add the first segment - String segment1 = SEGMENT_PREFIX + 1; + String segment1 = getSegmentName(1); List<RecordInfo> recordInfoList1 = new ArrayList<>(); recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100)); recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 100)); @@ -55,14 +57,14 @@ public class PartitionUpsertMetadataManagerTest { recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 5, 100)); ThreadSafeMutableRoaringBitmap validDocIds1 = upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator()); - // segment1: 0 -> {0, 100}, 1 -> {4, 120}, 2 -> {2, 100} - checkRecordLocation(recordLocationMap, 0, segment1, 0, 100); + // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100} + checkRecordLocation(recordLocationMap, 0, segment1, 5, 100); checkRecordLocation(recordLocationMap, 1, segment1, 4, 120); checkRecordLocation(recordLocationMap, 2, segment1, 2, 100); - assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 4}); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4, 5}); // Add the second segment - String segment2 = SEGMENT_PREFIX + 2; + String segment2 = getSegmentName(2); List<RecordInfo> recordInfoList2 = new ArrayList<>(); recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 0, 100)); recordInfoList2.add(new RecordInfo(getPrimaryKey(1), 1, 100)); @@ -71,45 +73,47 @@ public class PartitionUpsertMetadataManagerTest { recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 4, 80)); ThreadSafeMutableRoaringBitmap validDocIds2 = upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator()); - // segment1: 0 -> {0, 100}, 1 -> {4, 120} - // segment2: 2 -> {2, 120}, 3 -> {3, 80} - checkRecordLocation(recordLocationMap, 0, segment1, 0, 100); + // segment1: 1 -> {4, 120} + // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} + checkRecordLocation(recordLocationMap, 0, segment2, 0, 100); checkRecordLocation(recordLocationMap, 1, segment1, 4, 120); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120); checkRecordLocation(recordLocationMap, 3, segment2, 3, 80); - assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4}); - assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2, 3}); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); // Replace (reload) the first segment ThreadSafeMutableRoaringBitmap newValidDocIds1 = upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator()); - // original segment1: 0 -> {0, 100}, 1 -> {4, 120} - // segment2: 2 -> {2, 120}, 3 -> {3, 80} - // new segment1: 0 -> {0, 100}, 1 -> {4, 120} - checkRecordLocation(recordLocationMap, 0, segment1, 0, 100); + // original segment1: 1 -> {4, 120} + // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} + // new segment1: 1 -> {4, 120} + checkRecordLocation(recordLocationMap, 0, segment2, 0, 100); checkRecordLocation(recordLocationMap, 1, segment1, 4, 120); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120); checkRecordLocation(recordLocationMap, 3, segment2, 3, 80); - assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4}); - assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2, 3}); - assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4}); - assertSame(recordLocationMap.get(getPrimaryKey(0)).getValidDocIds(), newValidDocIds1); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); + assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(), newValidDocIds1); // Remove the original segment1 upsertMetadataManager.removeSegment(segment1, validDocIds1); - // segment2: 2 -> {2, 120}, 3 -> {3, 80} - // new segment1: 0 -> {0, 100}, 1 -> {4, 120} - checkRecordLocation(recordLocationMap, 0, segment1, 0, 100); + // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} + // new segment1: 1 -> {4, 120} + checkRecordLocation(recordLocationMap, 0, segment2, 0, 100); checkRecordLocation(recordLocationMap, 1, segment1, 4, 120); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120); checkRecordLocation(recordLocationMap, 3, segment2, 3, 80); - assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2, 3}); - assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4}); - assertSame(recordLocationMap.get(getPrimaryKey(0)).getValidDocIds(), newValidDocIds1); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); + assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(), newValidDocIds1); } + private static String getSegmentName(int sequenceNumber) { + return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber, System.currentTimeMillis()).toString(); + } + private static PrimaryKey getPrimaryKey(int value) { return new PrimaryKey(new Object[]{value}); } @@ -126,12 +130,12 @@ public class PartitionUpsertMetadataManagerTest { @Test public void testUpdateRecord() { PartitionUpsertMetadataManager upsertMetadataManager = - new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0, Mockito.mock(ServerMetrics.class)); + new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Mockito.mock(ServerMetrics.class)); Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add the first segment // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100} - String segment1 = SEGMENT_PREFIX + 1; + String segment1 = getSegmentName(1); List<RecordInfo> recordInfoList1 = new ArrayList<>(); recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100)); recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 120)); @@ -140,7 +144,7 @@ public class PartitionUpsertMetadataManagerTest { upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator()); // Update records from the second segment - String segment2 = SEGMENT_PREFIX + 2; + String segment2 = getSegmentName(2); ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap(); upsertMetadataManager.updateRecord(segment2, new RecordInfo(getPrimaryKey(3), 0, 100), validDocIds2); @@ -174,32 +178,32 @@ public class PartitionUpsertMetadataManagerTest { assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); upsertMetadataManager.updateRecord(segment2, new RecordInfo(getPrimaryKey(0), 3, 100), validDocIds2); - // segment1: 0 -> {0, 100}, 1 -> {1, 120} - // segment2: 2 -> {1, 120}, 3 -> {0, 100} - checkRecordLocation(recordLocationMap, 0, segment1, 0, 100); + // segment1: 1 -> {1, 120} + // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100} + checkRecordLocation(recordLocationMap, 0, segment2, 3, 100); checkRecordLocation(recordLocationMap, 1, segment1, 1, 120); checkRecordLocation(recordLocationMap, 2, segment2, 1, 120); checkRecordLocation(recordLocationMap, 3, segment2, 0, 100); - assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); - assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1}); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3}); } @Test public void testRemoveSegment() { PartitionUpsertMetadataManager upsertMetadataManager = - new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0, Mockito.mock(ServerMetrics.class)); + new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Mockito.mock(ServerMetrics.class)); Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add 2 segments // segment1: 0 -> {0, 100}, 1 -> {1, 100} // segment2: 2 -> {0, 100}, 3 -> {0, 100} - String segment1 = SEGMENT_PREFIX + 1; + String segment1 = getSegmentName(1); List<RecordInfo> recordInfoList1 = new ArrayList<>(); recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100)); recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 100)); ThreadSafeMutableRoaringBitmap validDocIds1 = upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator()); - String segment2 = SEGMENT_PREFIX + 1; + String segment2 = getSegmentName(2); List<RecordInfo> recordInfoList2 = new ArrayList<>(); recordInfoList2.add(new RecordInfo(getPrimaryKey(2), 0, 100)); recordInfoList2.add(new RecordInfo(getPrimaryKey(3), 1, 100)); @@ -211,8 +215,8 @@ public class PartitionUpsertMetadataManagerTest { // segment2: 2 -> {0, 100}, 3 -> {0, 100} assertNull(recordLocationMap.get(getPrimaryKey(0))); assertNull(recordLocationMap.get(getPrimaryKey(1))); - checkRecordLocation(recordLocationMap, 2, segment1, 0, 100); - checkRecordLocation(recordLocationMap, 3, segment1, 1, 100); + checkRecordLocation(recordLocationMap, 2, segment2, 0, 100); + checkRecordLocation(recordLocationMap, 3, segment2, 1, 100); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
