snleee commented on a change in pull request #7368:
URL: https://github.com/apache/pinot/pull/7368#discussion_r719801874



##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -463,4 +492,134 @@ private long getWatermarkMs(long minStartTimeMs, long 
bucketMs, String mergeLeve
 
     return pinotTaskConfigs;
   }
+
+  private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long 
bufferTimeMs, long bucketTimeMs) {
+    if (watermarkMs == -1) {
+      return 0;
+    }
+    return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) / 
bucketTimeMs;
+  }
+
+  private void setWatermarkMs(String tableNameWithType, String mergeLevel, 
long watermarkMs, long bufferTimeMs,
+      long bucketTimeMs) {
+    LOGGER.info("Creating the gauge metric for tracking the merge/roll-up task 
delay for table: {} and mergeLevel: {}."
+            + "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, 
taskDelayInNumTimeBuckets={})", tableNameWithType,
+        mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs,
+        getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, 
bucketTimeMs));
+
+    ControllerMetrics controllerMetrics = 
_clusterInfoAccessor.getControllerMetrics();
+    if (controllerMetrics == null) {
+      return;
+    }
+
+    // Update gauge value that indicates the delay in terms of the number of 
time buckets.
+    Map<String, Long> watermarkForTable =
+        _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new 
ConcurrentHashMap<>());
+    watermarkForTable.compute(mergeLevel, (k, v) -> {
+      if (v == null) {
+        
controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType,
 mergeLevel),
+            (() -> 
getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L), 
bufferTimeMs,
+                bucketTimeMs)));
+      }
+      return watermarkMs;
+    });
+  }
+
+  /**
+   * Reset the watermark for the given table name
+   * @param tableNameWithType a table name with type
+   */
+  private void resetWatermarkMs(String tableNameWithType) {
+    ControllerMetrics controllerMetrics = 
_clusterInfoAccessor.getControllerMetrics();
+    if (controllerMetrics == null) {
+      return;
+    }
+
+    // Delete all the watermarks associated with the given table name
+    Map<String, Long> watermarksForTable = 
_mergeRollupWatermarks.remove(tableNameWithType);
+    if (watermarksForTable != null) {
+      for (String mergeLevel : watermarksForTable.keySet()) {
+        
controllerMetrics.removeGauge(getMetricNameForTaskDelay(tableNameWithType, 
mergeLevel));
+      }
+    }
+  }
+
+  /**
+   * Reset the watermerk for the given table name and merge level
+   *
+   * @param tableNameWithType table name with type
+   * @param mergeLevel merge level
+   */
+  private void resetWatermarkMs(String tableNameWithType, String mergeLevel) {
+    ControllerMetrics controllerMetrics = 
_clusterInfoAccessor.getControllerMetrics();
+    if (controllerMetrics == null) {
+      return;
+    }
+
+    // Delete all the watermarks associated with the given table name

Review comment:
       updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to