This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 64b901f426 update upsert TTL watermark in replaceSegment too (#14147)
64b901f426 is described below
commit 64b901f42668017e28d5be2bca43b90b2e747a19
Author: Xiaobing <[email protected]>
AuthorDate: Wed Oct 2 15:29:37 2024 -0700
update upsert TTL watermark in replaceSegment too (#14147)
* update upsert TTL watermark in replaceSegment too
---
.../upsert/BasePartitionUpsertMetadataManager.java | 53 +++++++++++++---------
...rrentMapPartitionUpsertMetadataManagerTest.java | 2 +-
2 files changed, 32 insertions(+), 23 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 9a868b34c5..0c72f26114 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -385,9 +385,6 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
"Got unsupported segment implementation: %s for segment: %s, table:
%s", segment.getClass(), segmentName,
_tableNameWithType);
- if (isTTLEnabled()) {
- updateWatermark(segment);
- }
if (!startOperation()) {
_logger.info("Skip adding segment: {} because metadata manager is
already stopped", segment.getSegmentName());
return;
@@ -413,11 +410,20 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
return _metadataTTL > 0 || _deletedKeysTTL > 0;
}
+ protected double getMaxComparisonValue(IndexSegment segment) {
+ return ((Number)
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0))
+ .getMaxValue()).doubleValue();
+ }
+
+ protected boolean isOutOfMetadataTTL(double maxComparisonValue) {
+ return _metadataTTL > 0 && _largestSeenComparisonValue.get() !=
TTL_WATERMARK_NOT_SET
+ && maxComparisonValue < _largestSeenComparisonValue.get() -
_metadataTTL;
+ }
+
protected boolean isOutOfMetadataTTL(IndexSegment segment) {
- if (_metadataTTL > 0 && _largestSeenComparisonValue.get() !=
TTL_WATERMARK_NOT_SET) {
- Number maxComparisonValue =
- (Number)
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
- return maxComparisonValue.doubleValue() <
_largestSeenComparisonValue.get() - _metadataTTL;
+ if (_metadataTTL > 0) {
+ double maxComparisonValue = getMaxComparisonValue(segment);
+ return isOutOfMetadataTTL(maxComparisonValue);
}
return false;
}
@@ -451,8 +457,12 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
protected void doAddSegment(ImmutableSegmentImpl segment) {
String segmentName = segment.getSegmentName();
_logger.info("Adding segment: {}, current primary key count: {}",
segmentName, getNumPrimaryKeys());
- if (isOutOfMetadataTTL(segment) && skipAddSegmentOutOfTTL(segment)) {
- return;
+ if (isTTLEnabled()) {
+ double maxComparisonValue = getMaxComparisonValue(segment);
+ _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v,
maxComparisonValue));
+ if (isOutOfMetadataTTL(maxComparisonValue) &&
skipAddSegmentOutOfTTL(segment)) {
+ return;
+ }
}
long startTimeMs = System.currentTimeMillis();
if (!_enableSnapshot) {
@@ -495,9 +505,6 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
"Got unsupported segment implementation: %s for segment: %s, table:
%s", segment.getClass(), segmentName,
_tableNameWithType);
- if (isTTLEnabled()) {
- updateWatermark(segment);
- }
if (!startOperation()) {
_logger.info("Skip preloading segment: {} because metadata manager is
already stopped", segmentName);
return;
@@ -527,8 +534,12 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
return;
}
- if (isOutOfMetadataTTL(segment) && skipPreloadSegmentOutOfTTL(segment,
validDocIds)) {
- return;
+ if (isTTLEnabled()) {
+ double maxComparisonValue = getMaxComparisonValue(segment);
+ _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v,
maxComparisonValue));
+ if (isOutOfMetadataTTL(maxComparisonValue) &&
skipPreloadSegmentOutOfTTL(segment, validDocIds)) {
+ return;
+ }
}
try (UpsertUtils.RecordInfoReader recordInfoReader = new
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
_comparisonColumns, _deleteRecordColumn)) {
@@ -701,7 +712,12 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
replaceSegment(segment, null, null, null, oldSegment);
return;
}
-
+ if (isTTLEnabled()) {
+ double maxComparisonValue = getMaxComparisonValue(segment);
+ _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v,
maxComparisonValue));
+ // Segment might be uploaded directly to the table to replace an old
segment. So update the TTL watermark but
+ // we can't skip segment even if it's out of TTL as its validDocIds
bitmap is not updated yet.
+ }
try (UpsertUtils.RecordInfoReader recordInfoReader = new
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
_comparisonColumns, _deleteRecordColumn)) {
Iterator<RecordInfo> recordInfoIterator =
@@ -1031,13 +1047,6 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION
+ _partitionId);
}
- protected void updateWatermark(ImmutableSegment segment) {
- double maxComparisonValue =
- ((Number)
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0))
- .getMaxValue()).doubleValue();
- _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v,
maxComparisonValue));
- }
-
@VisibleForTesting
double getWatermark() {
return _largestSeenComparisonValue.get();
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 90c03bb5de..b29fd68559 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -238,7 +238,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
ImmutableSegmentImpl segment =
mockImmutableSegmentWithEndTime(1, new
ThreadSafeMutableRoaringBitmap(), null, new ArrayList<>(),
COMPARISON_COLUMNS, new Double(currentTimeMs + 1024), new
MutableRoaringBitmap());
- upsertMetadataManager.updateWatermark(segment);
+ upsertMetadataManager.setWatermark(currentTimeMs + 1024);
assertEquals(upsertMetadataManager.getWatermark(), currentTimeMs + 1024);
// Stop the metadata manager
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]