Jackie-Jiang commented on code in PR #12037:
URL: https://github.com/apache/pinot/pull/12037#discussion_r1414717992
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -731,6 +731,21 @@ static void validateUpsertAndDedupConfig(TableConfig
tableConfig, Schema schema)
"The delete record column must be a single-valued BOOLEAN column");
}
+ double deletedKeysTTL = upsertConfig.getDeletedKeysTTL();
Review Comment:
Shall we move this along with `metadataTTL` validation to avoid duplicate
code on `comparisonColumns` check?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -231,13 +233,49 @@ protected void removeSegment(IndexSegment segment,
MutableRoaringBitmap validDoc
@Override
public void doRemoveExpiredPrimaryKeys() {
- double threshold = _largestSeenComparisonValue - _metadataTTL;
+ AtomicInteger numDeletedKeys = new AtomicInteger();
+ long startTime = System.currentTimeMillis();
+ double metadataTTLKeysThreshold;
+ if (_metadataTTL > 0) {
+ metadataTTLKeysThreshold = _largestSeenComparisonValue - _metadataTTL;
+ } else {
+ metadataTTLKeysThreshold = Double.MIN_VALUE;
+ }
+
+ double deletedKeysThreshold;
+
+ if (_deletedKeysTTL > 0) {
+ deletedKeysThreshold = _largestSeenComparisonValue - _deletedKeysTTL;
+ } else {
+ deletedKeysThreshold = Double.MIN_VALUE;
+ }
_primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
- if (((Number) recordLocation.getComparisonValue()).doubleValue() <
threshold) {
+ if (_metadataTTL > 0 && ((Number)
recordLocation.getComparisonValue()).doubleValue() < metadataTTLKeysThreshold) {
_primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
+ numDeletedKeys.getAndIncrement();
+ } else if (_deletedKeysTTL > 0
+ && ((Number) recordLocation.getComparisonValue()).doubleValue() <
deletedKeysThreshold) {
+ ThreadSafeMutableRoaringBitmap currentQueryableDocIds =
recordLocation.getSegment().getQueryableDocIds();
+ // if key not part of queryable doc id, it means it is deleted
+ if (currentQueryableDocIds != null &&
!currentQueryableDocIds.contains(recordLocation.getDocId())) {
Review Comment:
(minor) `currentQueryableDocIds != null` should be redundant
##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java:
##########
@@ -47,7 +47,10 @@ public enum ServerTimer implements AbstractMetrics.Timer {
SEGMENT_UPLOAD_TIME_MS("milliseconds", false),
TOTAL_CPU_TIME_NS("nanoseconds", false, "Total query cost (thread cpu time +
system "
- + "activities cpu time + response serialization cpu time) for query
processing on server.");
+ + "activities cpu time + response serialization cpu time) for query
processing on server."),
+
+ EXPIRED_PRIMARY_KEYS_DELETION_TIME_MS("milliseconds", false, "Total time
taken to delete expired primary keys based "
Review Comment:
(minor) Suggest renaming it to `UPSERT_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS`
to match the function name.
As a follow up, we can add timer for other operations as well
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -231,13 +233,49 @@ protected void removeSegment(IndexSegment segment,
MutableRoaringBitmap validDoc
@Override
public void doRemoveExpiredPrimaryKeys() {
- double threshold = _largestSeenComparisonValue - _metadataTTL;
+ AtomicInteger numDeletedKeys = new AtomicInteger();
+ long startTime = System.currentTimeMillis();
+ double metadataTTLKeysThreshold;
+ if (_metadataTTL > 0) {
+ metadataTTLKeysThreshold = _largestSeenComparisonValue - _metadataTTL;
+ } else {
+ metadataTTLKeysThreshold = Double.MIN_VALUE;
+ }
+
+ double deletedKeysThreshold;
+
+ if (_deletedKeysTTL > 0) {
+ deletedKeysThreshold = _largestSeenComparisonValue - _deletedKeysTTL;
+ } else {
+ deletedKeysThreshold = Double.MIN_VALUE;
+ }
_primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
- if (((Number) recordLocation.getComparisonValue()).doubleValue() <
threshold) {
+ if (_metadataTTL > 0 && ((Number)
recordLocation.getComparisonValue()).doubleValue() < metadataTTLKeysThreshold) {
Review Comment:
(minor) Cache `((Number) recordLocation.getComparisonValue()).doubleValue()`
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -231,13 +233,49 @@ protected void removeSegment(IndexSegment segment,
MutableRoaringBitmap validDoc
@Override
public void doRemoveExpiredPrimaryKeys() {
- double threshold = _largestSeenComparisonValue - _metadataTTL;
+ AtomicInteger numDeletedKeys = new AtomicInteger();
+ long startTime = System.currentTimeMillis();
+ double metadataTTLKeysThreshold;
+ if (_metadataTTL > 0) {
+ metadataTTLKeysThreshold = _largestSeenComparisonValue - _metadataTTL;
+ } else {
+ metadataTTLKeysThreshold = Double.MIN_VALUE;
+ }
+
+ double deletedKeysThreshold;
+
+ if (_deletedKeysTTL > 0) {
+ deletedKeysThreshold = _largestSeenComparisonValue - _deletedKeysTTL;
+ } else {
+ deletedKeysThreshold = Double.MIN_VALUE;
+ }
_primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
- if (((Number) recordLocation.getComparisonValue()).doubleValue() <
threshold) {
+ if (_metadataTTL > 0 && ((Number)
recordLocation.getComparisonValue()).doubleValue() < metadataTTLKeysThreshold) {
_primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
+ numDeletedKeys.getAndIncrement();
+ } else if (_deletedKeysTTL > 0
+ && ((Number) recordLocation.getComparisonValue()).doubleValue() <
deletedKeysThreshold) {
+ ThreadSafeMutableRoaringBitmap currentQueryableDocIds =
recordLocation.getSegment().getQueryableDocIds();
+ // if key not part of queryable doc id, it means it is deleted
+ if (currentQueryableDocIds != null &&
!currentQueryableDocIds.contains(recordLocation.getDocId())) {
+ _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
+ removeDocId(recordLocation.getSegment(), recordLocation.getDocId());
+ numDeletedKeys.getAndIncrement();
+ }
}
});
- persistWatermark(_largestSeenComparisonValue);
+ if (_metadataTTL > 0) {
+ persistWatermark(_largestSeenComparisonValue);
+ }
+ long duration = System.currentTimeMillis() - startTime;
+ int numDeletedTTLKeys = numDeletedKeys.get();
+ if (numDeletedTTLKeys > 0) {
+ _logger.info("Deleted {} primary deleted keys in the table {}",
numDeletedTTLKeys, _tableNameWithType);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEY_REMOVED,
+ numDeletedTTLKeys);
+ }
+ _serverMetrics.addTimedTableValue(_tableNameWithType,
ServerTimer.EXPIRED_PRIMARY_KEYS_DELETION_TIME_MS, duration,
Review Comment:
(minor) This can be added to `BasePartitionUpsertMetadataManager` so that
other implementation can also record the time
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -157,15 +160,20 @@ public void addSegment(ImmutableSegment segment) {
"Got unsupported segment implementation: {} for segment: {}, table:
{}", segment.getClass(), segmentName,
_tableNameWithType);
ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
+ double maxComparisonValue =
Review Comment:
(MAJOR) We cannot read value here because there is no guarantee the
comparison column is number when TTL is not enabled
##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:
##########
@@ -48,6 +48,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
PARTIAL_UPSERT_OUT_OF_ORDER("rows", false),
PARTIAL_UPSERT_KEYS_NOT_REPLACED("rows", false),
UPSERT_OUT_OF_ORDER("rows", false),
+ DELETED_KEYS_TTL_PRIMARY_KEY_REMOVED("rows", false),
Review Comment:
+1 on adding this meter. Shall we keep removed keys from metadata TTL and
delete TTL separately?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]