Copilot commented on code in PR #17163:
URL: https://github.com/apache/pinot/pull/17163#discussion_r2513575327


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -521,6 +546,25 @@ public long getPartitionIngestionTimeMs(int partitionId) {
     return ingestionInfo != null ? ingestionInfo._ingestionTimeMs : 
Long.MIN_VALUE;
   }
 
+  /**
+   * Method to get end to end ingestion delay for a given partition.
+   *
+   * @param partitionId partition for which we are retrieving the delay
+   *
+   * @return End to end ingestion delay in milliseconds for the given 
partition ID.
+   */
+  public long getPartitionEndToEndIngestionDelayMs(int partitionId) {
+    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+    long firstStreamIngestionTimeMs = 0;
+    if ((ingestionInfo != null) && (ingestionInfo._firstStreamIngestionTimeMs 
> 0)) {

Review Comment:
   The condition on line 559 checks if 
`ingestionInfo._firstStreamIngestionTimeMs > 0`, but this incorrectly treats 0 
as an invalid timestamp. If the first stream ingestion timestamp is exactly 0 
(epoch), the method would incorrectly return the current time as the delay. 
Change the check to `ingestionInfo._firstStreamIngestionTimeMs != 
Long.MIN_VALUE` to align with how invalid timestamps are represented elsewhere 
in this class.
   ```suggestion
       if ((ingestionInfo != null) && 
(ingestionInfo._firstStreamIngestionTimeMs != Long.MIN_VALUE)) {
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java:
##########
@@ -98,6 +98,7 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   REALTIME_INGESTION_CONSUMING_OFFSET("consumingOffset", false, "The offset of 
the last consumed message."),
   REALTIME_INGESTION_DELAY_MS("milliseconds", false,
       "The difference of the current timestamp and the timestamp present in 
the last consumed message record."),
+  END_TO_END_REALTIME_INGESTION_DELAY_MS("milliseconds", false),

Review Comment:
   The `END_TO_END_REALTIME_INGESTION_DELAY_MS` gauge lacks a description 
parameter. Add a descriptive third parameter explaining what this metric 
measures, such as: `\"The difference between current timestamp and the first 
stream ingestion timestamp of the last consumed message.\"`
   ```suggestion
     END_TO_END_REALTIME_INGESTION_DELAY_MS("milliseconds", false,
         "The difference between current timestamp and the first stream 
ingestion timestamp of the last consumed message."),
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -320,22 +332,10 @@ public Thread newThread(Runnable r) {
    * @param partitionId partition ID which we should stop tracking.
    */
   private void removePartitionId(int partitionId) {
-    _partitionsTracked.compute(partitionId, (k, v) -> {
-      if (v != null) {
-        int streamConfigIndex = 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
-        StreamMetadataProvider streamMetadataProvider =
-            _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex);
-        // Remove all metrics associated with this partition
-        if (streamMetadataProvider != null && 
streamMetadataProvider.supportsOffsetLag()) {
-          _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
-          _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET);
-          _serverMetrics.removePartitionGauge(_metricName, partitionId,
-              ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
-        }
-        _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_DELAY_MS);
-        LOGGER.info("Successfully removed ingestion metrics for partition id: 
{}", partitionId);
-      }
-      _ingestionInfoMap.remove(partitionId);
+    _partitionsHostedByThisServer.remove(partitionId);
+    _ingestionInfoMap.remove(partitionId);
+    _partitionsTracked.computeIfPresent(partitionId, (k, v) -> {
+      removeMetrics(partitionId);
       return null;
     });

Review Comment:
   The removal of `partitionId` from `_partitionsHostedByThisServer` and 
`_ingestionInfoMap` on lines 335-336 is not atomic with the metric removal in 
`_partitionsTracked`. This creates a race condition where 
`trackIngestionDelay()` could observe the partition as removed from the hosted 
list but still present in `_partitionsTracked`, leading to inconsistent state. 
Consider wrapping these operations in a synchronized block or reordering to 
remove from `_partitionsTracked` first.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -242,6 +247,11 @@ private void trackIngestionDelay() {
 
       for (Integer partitionId : partitionsHosted) {
         _partitionsTracked.computeIfAbsent(partitionId, k -> {
+          // Check below condition in-case partition was stopped being 
tracked. Eg: due to manual operations like -
+          // calling remove ingestion metrics API
+          if (!_partitionsHostedByThisServer.containsKey(partitionId) && 
!_ingestionInfoMap.containsKey(partitionId)) {

Review Comment:
   The condition on line 252 uses AND logic but should use OR. If a partition 
was removed, either `_partitionsHostedByThisServer` or `_ingestionInfoMap` 
might not contain it, but not necessarily both. Change to `if 
(!_partitionsHostedByThisServer.containsKey(partitionId) || 
!_ingestionInfoMap.containsKey(partitionId))` to properly handle cases where a 
partition has been stopped from tracking.
   ```suggestion
             if (!_partitionsHostedByThisServer.containsKey(partitionId) || 
!_ingestionInfoMap.containsKey(partitionId)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to