Copilot commented on code in PR #17163:
URL: https://github.com/apache/pinot/pull/17163#discussion_r2513575327
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -521,6 +546,25 @@ public long getPartitionIngestionTimeMs(int partitionId) {
return ingestionInfo != null ? ingestionInfo._ingestionTimeMs :
Long.MIN_VALUE;
}
+ /**
+ * Method to get end to end ingestion delay for a given partition.
+ *
+ * @param partitionId partition for which we are retrieving the delay
+ *
+ * @return End to end ingestion delay in milliseconds for the given
partition ID.
+ */
+ public long getPartitionEndToEndIngestionDelayMs(int partitionId) {
+ IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+ long firstStreamIngestionTimeMs = 0;
+ if ((ingestionInfo != null) && (ingestionInfo._firstStreamIngestionTimeMs
> 0)) {
Review Comment:
The condition on line 559 checks if
`ingestionInfo._firstStreamIngestionTimeMs > 0`, but this incorrectly treats 0
as an invalid timestamp. If the first stream ingestion timestamp is exactly 0
(epoch), the method would incorrectly return the current time as the delay.
Change the check to `ingestionInfo._firstStreamIngestionTimeMs !=
Long.MIN_VALUE` to align with how invalid timestamps are represented elsewhere
in this class.
```suggestion
if ((ingestionInfo != null) &&
(ingestionInfo._firstStreamIngestionTimeMs != Long.MIN_VALUE)) {
```
##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java:
##########
@@ -98,6 +98,7 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
REALTIME_INGESTION_CONSUMING_OFFSET("consumingOffset", false, "The offset of
the last consumed message."),
REALTIME_INGESTION_DELAY_MS("milliseconds", false,
"The difference of the current timestamp and the timestamp present in
the last consumed message record."),
+ END_TO_END_REALTIME_INGESTION_DELAY_MS("milliseconds", false),
Review Comment:
The `END_TO_END_REALTIME_INGESTION_DELAY_MS` gauge lacks a description
parameter. Add a descriptive third parameter explaining what this metric
measures, such as: `\"The difference between current timestamp and the first
stream ingestion timestamp of the last consumed message.\"`
```suggestion
END_TO_END_REALTIME_INGESTION_DELAY_MS("milliseconds", false,
"The difference between current timestamp and the first stream
ingestion timestamp of the last consumed message."),
```
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -320,22 +332,10 @@ public Thread newThread(Runnable r) {
* @param partitionId partition ID which we should stop tracking.
*/
private void removePartitionId(int partitionId) {
- _partitionsTracked.compute(partitionId, (k, v) -> {
- if (v != null) {
- int streamConfigIndex =
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
- StreamMetadataProvider streamMetadataProvider =
- _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex);
- // Remove all metrics associated with this partition
- if (streamMetadataProvider != null &&
streamMetadataProvider.supportsOffsetLag()) {
- _serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
- _serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET);
- _serverMetrics.removePartitionGauge(_metricName, partitionId,
- ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
- }
- _serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_DELAY_MS);
- LOGGER.info("Successfully removed ingestion metrics for partition id:
{}", partitionId);
- }
- _ingestionInfoMap.remove(partitionId);
+ _partitionsHostedByThisServer.remove(partitionId);
+ _ingestionInfoMap.remove(partitionId);
+ _partitionsTracked.computeIfPresent(partitionId, (k, v) -> {
+ removeMetrics(partitionId);
return null;
});
Review Comment:
The removal of `partitionId` from `_partitionsHostedByThisServer` and
`_ingestionInfoMap` on lines 335-336 is not atomic with the metric removal in
`_partitionsTracked`. This creates a race condition where
`trackIngestionDelay()` could observe the partition as removed from the hosted
list but still present in `_partitionsTracked`, leading to inconsistent state.
Consider wrapping these operations in a synchronized block or reordering to
remove from `_partitionsTracked` first.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -242,6 +247,11 @@ private void trackIngestionDelay() {
for (Integer partitionId : partitionsHosted) {
_partitionsTracked.computeIfAbsent(partitionId, k -> {
+ // Check below condition in-case partition was stopped being
tracked. Eg: due to manual operations like -
+ // calling remove ingestion metrics API
+ if (!_partitionsHostedByThisServer.containsKey(partitionId) &&
!_ingestionInfoMap.containsKey(partitionId)) {
Review Comment:
The condition on line 252 uses AND logic but should use OR. If a partition
was removed, either `_partitionsHostedByThisServer` or `_ingestionInfoMap`
might not contain it, but not necessarily both. Change to `if
(!_partitionsHostedByThisServer.containsKey(partitionId) ||
!_ingestionInfoMap.containsKey(partitionId))` to properly handle cases where a
partition has been stopped from tracking.
```suggestion
if (!_partitionsHostedByThisServer.containsKey(partitionId) ||
!_ingestionInfoMap.containsKey(partitionId)) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]