Jackie-Jiang commented on code in PR #13992:
URL: https://github.com/apache/pinot/pull/13992#discussion_r1761577304
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -1200,12 +1219,45 @@ public void trackSegmentForUpsertView(IndexSegment
segment) {
if (_upsertViewManager != null) {
_upsertViewManager.trackSegment(segment);
}
+ if (segment instanceof MutableSegment) {
+ trackNewlyAddedSegment(segment);
+ }
}
@Override
public void untrackSegmentForUpsertView(IndexSegment segment) {
if (_upsertViewManager != null) {
_upsertViewManager.untrackSegment(segment);
}
+ if (segment instanceof MutableSegment) {
+ untrackNewlyAddedSegment(segment);
+ }
+ }
+
+ @VisibleForTesting
+ void trackNewlyAddedSegment(IndexSegment segment) {
+ if (_newSegmentTrackingTimeMs > 0) {
+ _newlyAddedSegments.put(segment.getSegmentName(), -1L);
+ }
+ }
+
+ @VisibleForTesting
+ void untrackNewlyAddedSegment(IndexSegment segment) {
+ if (_newSegmentTrackingTimeMs > 0) {
+ _newlyAddedSegments.put(segment.getSegmentName(),
System.currentTimeMillis() + _newSegmentTrackingTimeMs);
+ }
+ }
+
+ public Set<String> getNewlyAddedSegments() {
+ if (_newSegmentTrackingTimeMs > 0) {
+ // Untrack stale segments at query time. The overhead should be limited
as the tracking map should be very small.
+ long nowMs = System.currentTimeMillis();
+ if (_logger.isDebugEnabled()) {
+ _logger.debug("Cleaning stale segments from tracking map: {} with
nowMs: {}", _newlyAddedSegments, nowMs);
+ }
+ _newlyAddedSegments.entrySet().removeIf(e -> e.getValue() > 0 &&
e.getValue() < nowMs);
Review Comment:
(minor) Can we use `valueSet()` to remove entries?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -153,11 +164,17 @@ protected BasePartitionUpsertMetadataManager(String
tableNameWithType, int parti
_metadataTTL = context.getMetadataTTL();
_deletedKeysTTL = context.getDeletedKeysTTL();
_tableIndexDir = context.getTableIndexDir();
+ long trackingTimeMs = context.getNewSegmentTrackingTimeMs();
UpsertConfig.ConsistencyMode cmode = context.getConsistencyMode();
if (cmode == UpsertConfig.ConsistencyMode.SYNC || cmode ==
UpsertConfig.ConsistencyMode.SNAPSHOT) {
_upsertViewManager = new UpsertViewManager(cmode, context);
+ // For consistency mode, we have to track newly added segments, so use
default tracking time to enable the
+ // tracking of newly added segments if it's not enabled explicitly.
+ _newSegmentTrackingTimeMs =
+ trackingTimeMs > 0 ? trackingTimeMs :
UpsertViewManager.DEFAULT_NEW_SEGMENT_TRACKING_TIME_MS;
Review Comment:
Consider banning this config in table validation, instead of ignoring the
config that explicitly turns off this feature
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -1200,12 +1219,45 @@ public void trackSegmentForUpsertView(IndexSegment
segment) {
if (_upsertViewManager != null) {
_upsertViewManager.trackSegment(segment);
}
+ if (segment instanceof MutableSegment) {
+ trackNewlyAddedSegment(segment);
+ }
}
@Override
public void untrackSegmentForUpsertView(IndexSegment segment) {
if (_upsertViewManager != null) {
_upsertViewManager.untrackSegment(segment);
}
+ if (segment instanceof MutableSegment) {
+ untrackNewlyAddedSegment(segment);
+ }
+ }
+
+ @VisibleForTesting
+ void trackNewlyAddedSegment(IndexSegment segment) {
+ if (_newSegmentTrackingTimeMs > 0) {
+ _newlyAddedSegments.put(segment.getSegmentName(), -1L);
+ }
+ }
+
+ @VisibleForTesting
+ void untrackNewlyAddedSegment(IndexSegment segment) {
Review Comment:
Why do we need to differentiate track/untrack? It is quite confusing that
the segments are actually tracked after calling untrack. Some javadoc can be
helpful
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]