This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8678f5e [Upsert] Preserve the newer added record when 2 records have
the same timestamp (#6213)
8678f5e is described below
commit 8678f5ec0a5a58a095b38d27b5ecda55efdde8ec
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Oct 30 12:27:04 2020 -0700
[Upsert] Preserve the newer added record when 2 records have the same
timestamp (#6213)
For upsert table, the record with newer timestamp will replace the old
record with older timestamp, but when multiple records have the same timestamp,
which record to preserve is undefined in the current implementation.
This PR enhances the PartitionUpsertMetadataManager to preserve the latest
ingested record if multiple records have the same timestamp:
- If 2 records are not in the same segment, preserve the one in the segment
with larger sequence number
- If 2 records are in the same segment, preserve the one with larger doc id
Note that for tables with sorted column, the records will be re-ordered
when committing the segment, and we will use the re-ordered docIds instead of
the ingestion order to decide which record to preserve.
---
.../apache/pinot/common/utils/LLCSegmentName.java | 14 +++-
.../upsert/PartitionUpsertMetadataManager.java | 54 +++++++-------
.../upsert/PartitionUpsertMetadataManagerTest.java | 84 +++++++++++-----------
3 files changed, 85 insertions(+), 67 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
index 29824f1..adc24ad 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.utils;
+import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -35,11 +36,11 @@ public class LLCSegmentName extends SegmentName implements
Comparable {
throw new RuntimeException(segmentName + " is not a Low level consumer
segment name");
}
- String[] parts = segmentName.split(SEPARATOR);
_segmentName = segmentName;
+ String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
_tableName = parts[0];
- _partitionId = Integer.valueOf(parts[1]);
- _sequenceNumber = Integer.valueOf(parts[2]);
+ _partitionId = Integer.parseInt(parts[1]);
+ _sequenceNumber = Integer.parseInt(parts[2]);
_creationTime = parts[3];
}
@@ -56,6 +57,13 @@ public class LLCSegmentName extends SegmentName implements
Comparable {
_segmentName = tableName + SEPARATOR + partitionId + SEPARATOR +
sequenceNumber + SEPARATOR + _creationTime;
}
+ /**
+ * Returns the sequence number of the given segment name.
+ */
+ public static int getSequenceNumber(String segmentName) {
+ return Integer.parseInt(StringUtils.splitByWholeSeparator(segmentName,
SEPARATOR)[2]);
+ }
+
@Override
public RealtimeSegmentType getSegmentType() {
return RealtimeSegmentType.LLC;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
index 24acbab..3912c2c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.slf4j.Logger;
@@ -32,7 +33,10 @@ import org.slf4j.LoggerFactory;
/**
* Manages the upsert metadata per partition.
- * <p>For multiple records with the same timestamp, there is no guarantee on
which record to be preserved.
+ * <p>For multiple records with the same timestamp, the manager will preserve
the latest record based on the sequence
+ * number of the segment. If 2 records with the same timestamp are in the same
segment, the one with larger doc id will
+ * be preserved. Note that for tables with sorted column, the records will be
re-ordered when committing the segment,
+ * and we will use the re-ordered doc ids instead of the ingestion order to
decide which record to preserve.
* <p>There will be short term inconsistency when updating the upsert
metadata, but should be consistent after the
* operation is done:
* <ul>
@@ -83,34 +87,35 @@ public class PartitionUpsertMetadataManager {
if (segmentName.equals(currentRecordLocation.getSegmentName())) {
// The current record location has the same segment name
- if (validDocIds == currentRecordLocation.getValidDocIds()) {
- // The current record location is pointing to the new segment
being loaded
-
- // Update the record location when getting a newer timestamp
- if (recordInfo._timestamp >
currentRecordLocation.getTimestamp()) {
+ // Update the record location when the new timestamp is greater
than or equal to the current timestamp.
+ // There are 2 scenarios:
+ // 1. The current record location is pointing to the same
segment (the segment being added). In this case,
+ // we want to update the record location when there is a tie
to keep the newer record. Note that the
+ // record info iterator will return records with incremental
doc ids.
+ // 2. The current record location is pointing to the old segment
being replaced. This could happen when
+ // committing a consuming segment, or reloading a completed
segment. In this case, we want to update
+ // the record location when there is a tie because the record
locations should point to the new added
+ // segment instead of the old segment being replaced. Also,
do not update the valid doc ids for the old
+ // segment because it has not been replaced yet.
+ if (recordInfo._timestamp >= currentRecordLocation.getTimestamp())
{
+ // Only update the valid doc ids for the new segment
+ if (validDocIds == currentRecordLocation.getValidDocIds()) {
validDocIds.remove(currentRecordLocation.getDocId());
- validDocIds.add(recordInfo._docId);
- return new RecordLocation(segmentName, recordInfo._docId,
recordInfo._timestamp, validDocIds);
}
+ validDocIds.add(recordInfo._docId);
+ return new RecordLocation(segmentName, recordInfo._docId,
recordInfo._timestamp, validDocIds);
} else {
- // The current record location is pointing to the old segment
being replaced. This could happen when
- // committing a consuming segment, or reloading a completed
segment.
-
- // Update the record location when the new timestamp is greater
than or equal to the current timestamp.
- // Update the record location when there is a tie because the
record locations should point to the new
- // segment instead of the old segment being replaced. Also, do
not update the valid doc ids for the old
- // segment because it has not been replaced yet.
- if (recordInfo._timestamp >=
currentRecordLocation.getTimestamp()) {
- validDocIds.add(recordInfo._docId);
- return new RecordLocation(segmentName, recordInfo._docId,
recordInfo._timestamp, validDocIds);
- }
+ return currentRecordLocation;
}
- return currentRecordLocation;
} else {
// The current record location is pointing to a different segment
- // Update the record location when getting a newer timestamp
- if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+ // Update the record location when getting a newer timestamp, or
the timestamp is the same as the current
+ // timestamp, but the segment has a larger sequence number (the
segment is newer than the current segment).
+ if (recordInfo._timestamp > currentRecordLocation.getTimestamp()
|| (
+ recordInfo._timestamp == currentRecordLocation.getTimestamp()
+ && LLCSegmentName.getSequenceNumber(segmentName) >
LLCSegmentName
+
.getSequenceNumber(currentRecordLocation.getSegmentName()))) {
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
validDocIds.add(recordInfo._docId);
return new RecordLocation(segmentName, recordInfo._docId,
recordInfo._timestamp, validDocIds);
@@ -139,8 +144,9 @@ public class PartitionUpsertMetadataManager {
if (currentRecordLocation != null) {
// Existing primary key
- // Update the record location when getting a newer timestamp
- if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+ // Update the record location when the new timestamp is greater than
or equal to the current timestamp. Update
+ // the record location when there is a tie to keep the newer record.
+ if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
validDocIds.add(recordInfo._docId);
return new RecordLocation(segmentName, recordInfo._docId,
recordInfo._timestamp, validDocIds);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
index d1cd679..a2bde99 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
@@ -22,9 +22,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.core.upsert.PartitionUpsertMetadataManager.RecordInfo;
import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.mockito.Mockito;
import org.testng.annotations.Test;
@@ -35,17 +37,17 @@ import static org.testng.Assert.assertSame;
public class PartitionUpsertMetadataManagerTest {
- private static final String SEGMENT_PREFIX = "testSegment";
- private static final String REALTIME_TEST_TABLE = "testTable_REALTIME";
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
@Test
public void testAddSegment() {
PartitionUpsertMetadataManager upsertMetadataManager =
- new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0,
Mockito.mock(ServerMetrics.class));
+ new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
Mockito.mock(ServerMetrics.class));
Map<PrimaryKey, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
- String segment1 = SEGMENT_PREFIX + 1;
+ String segment1 = getSegmentName(1);
List<RecordInfo> recordInfoList1 = new ArrayList<>();
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 100));
@@ -55,14 +57,14 @@ public class PartitionUpsertMetadataManagerTest {
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 5, 100));
ThreadSafeMutableRoaringBitmap validDocIds1 =
upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
- // segment1: 0 -> {0, 100}, 1 -> {4, 120}, 2 -> {2, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+ // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ checkRecordLocation(recordLocationMap, 0, segment1, 5, 100);
checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
checkRecordLocation(recordLocationMap, 2, segment1, 2, 100);
- assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 4});
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4, 5});
// Add the second segment
- String segment2 = SEGMENT_PREFIX + 2;
+ String segment2 = getSegmentName(2);
List<RecordInfo> recordInfoList2 = new ArrayList<>();
recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList2.add(new RecordInfo(getPrimaryKey(1), 1, 100));
@@ -71,45 +73,47 @@ public class PartitionUpsertMetadataManagerTest {
recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 4, 80));
ThreadSafeMutableRoaringBitmap validDocIds2 =
upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
- // segment1: 0 -> {0, 100}, 1 -> {4, 120}
- // segment2: 2 -> {2, 120}, 3 -> {3, 80}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+ // segment1: 1 -> {4, 120}
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
- assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 4});
- assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{2, 3});
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
// Replace (reload) the first segment
ThreadSafeMutableRoaringBitmap newValidDocIds1 =
upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
- // original segment1: 0 -> {0, 100}, 1 -> {4, 120}
- // segment2: 2 -> {2, 120}, 3 -> {3, 80}
- // new segment1: 0 -> {0, 100}, 1 -> {4, 120}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+ // original segment1: 1 -> {4, 120}
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // new segment1: 1 -> {4, 120}
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
- assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 4});
- assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{2, 3});
- assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 4});
- assertSame(recordLocationMap.get(getPrimaryKey(0)).getValidDocIds(),
newValidDocIds1);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(),
newValidDocIds1);
// Remove the original segment1
upsertMetadataManager.removeSegment(segment1, validDocIds1);
- // segment2: 2 -> {2, 120}, 3 -> {3, 80}
- // new segment1: 0 -> {0, 100}, 1 -> {4, 120}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // new segment1: 1 -> {4, 120}
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
- assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{2, 3});
- assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 4});
- assertSame(recordLocationMap.get(getPrimaryKey(0)).getValidDocIds(),
newValidDocIds1);
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(),
newValidDocIds1);
}
+ private static String getSegmentName(int sequenceNumber) {
+ return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber,
System.currentTimeMillis()).toString();
+ }
+
private static PrimaryKey getPrimaryKey(int value) {
return new PrimaryKey(new Object[]{value});
}
@@ -126,12 +130,12 @@ public class PartitionUpsertMetadataManagerTest {
@Test
public void testUpdateRecord() {
PartitionUpsertMetadataManager upsertMetadataManager =
- new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0,
Mockito.mock(ServerMetrics.class));
+ new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
Mockito.mock(ServerMetrics.class));
Map<PrimaryKey, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
- String segment1 = SEGMENT_PREFIX + 1;
+ String segment1 = getSegmentName(1);
List<RecordInfo> recordInfoList1 = new ArrayList<>();
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 120));
@@ -140,7 +144,7 @@ public class PartitionUpsertMetadataManagerTest {
upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
// Update records from the second segment
- String segment2 = SEGMENT_PREFIX + 2;
+ String segment2 = getSegmentName(2);
ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
upsertMetadataManager.updateRecord(segment2, new
RecordInfo(getPrimaryKey(3), 0, 100), validDocIds2);
@@ -174,32 +178,32 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
upsertMetadataManager.updateRecord(segment2, new
RecordInfo(getPrimaryKey(0), 3, 100), validDocIds2);
- // segment1: 0 -> {0, 100}, 1 -> {1, 120}
- // segment2: 2 -> {1, 120}, 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+ // segment1: 1 -> {1, 120}
+ // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
+ checkRecordLocation(recordLocationMap, 0, segment2, 3, 100);
checkRecordLocation(recordLocationMap, 1, segment1, 1, 120);
checkRecordLocation(recordLocationMap, 2, segment2, 1, 120);
checkRecordLocation(recordLocationMap, 3, segment2, 0, 100);
- assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
- assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 3});
}
@Test
public void testRemoveSegment() {
PartitionUpsertMetadataManager upsertMetadataManager =
- new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0,
Mockito.mock(ServerMetrics.class));
+ new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
Mockito.mock(ServerMetrics.class));
Map<PrimaryKey, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add 2 segments
// segment1: 0 -> {0, 100}, 1 -> {1, 100}
// segment2: 2 -> {0, 100}, 3 -> {0, 100}
- String segment1 = SEGMENT_PREFIX + 1;
+ String segment1 = getSegmentName(1);
List<RecordInfo> recordInfoList1 = new ArrayList<>();
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 100));
ThreadSafeMutableRoaringBitmap validDocIds1 =
upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
- String segment2 = SEGMENT_PREFIX + 1;
+ String segment2 = getSegmentName(2);
List<RecordInfo> recordInfoList2 = new ArrayList<>();
recordInfoList2.add(new RecordInfo(getPrimaryKey(2), 0, 100));
recordInfoList2.add(new RecordInfo(getPrimaryKey(3), 1, 100));
@@ -211,8 +215,8 @@ public class PartitionUpsertMetadataManagerTest {
// segment2: 2 -> {0, 100}, 3 -> {0, 100}
assertNull(recordLocationMap.get(getPrimaryKey(0)));
assertNull(recordLocationMap.get(getPrimaryKey(1)));
- checkRecordLocation(recordLocationMap, 2, segment1, 0, 100);
- checkRecordLocation(recordLocationMap, 3, segment1, 1, 100);
+ checkRecordLocation(recordLocationMap, 2, segment2, 0, 100);
+ checkRecordLocation(recordLocationMap, 3, segment2, 1, 100);
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]