snleee commented on a change in pull request #7368:
URL: https://github.com/apache/pinot/pull/7368#discussion_r719801874
##########
File path:
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -463,4 +492,134 @@ private long getWatermarkMs(long minStartTimeMs, long
bucketMs, String mergeLeve
return pinotTaskConfigs;
}
+
+ private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long
bufferTimeMs, long bucketTimeMs) {
+ if (watermarkMs == -1) {
+ return 0;
+ }
+ return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) /
bucketTimeMs;
+ }
+
+ private void setWatermarkMs(String tableNameWithType, String mergeLevel,
long watermarkMs, long bufferTimeMs,
+ long bucketTimeMs) {
+ LOGGER.info("Creating the gauge metric for tracking the merge/roll-up task
delay for table: {} and mergeLevel: {}."
+ + "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={},
taskDelayInNumTimeBuckets={})", tableNameWithType,
+ mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs,
+ getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs,
bucketTimeMs));
+
+ ControllerMetrics controllerMetrics =
_clusterInfoAccessor.getControllerMetrics();
+ if (controllerMetrics == null) {
+ return;
+ }
+
+ // Update gauge value that indicates the delay in terms of the number of
time buckets.
+ Map<String, Long> watermarkForTable =
+ _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new
ConcurrentHashMap<>());
+ watermarkForTable.compute(mergeLevel, (k, v) -> {
+ if (v == null) {
+
controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType,
mergeLevel),
+ (() ->
getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L),
bufferTimeMs,
+ bucketTimeMs)));
+ }
+ return watermarkMs;
+ });
+ }
+
+ /**
+ * Reset the watermark for the given table name
+ * @param tableNameWithType a table name with type
+ */
+ private void resetWatermarkMs(String tableNameWithType) {
+ ControllerMetrics controllerMetrics =
_clusterInfoAccessor.getControllerMetrics();
+ if (controllerMetrics == null) {
+ return;
+ }
+
+ // Delete all the watermarks associated with the given table name
+ Map<String, Long> watermarksForTable =
_mergeRollupWatermarks.remove(tableNameWithType);
+ if (watermarksForTable != null) {
+ for (String mergeLevel : watermarksForTable.keySet()) {
+
controllerMetrics.removeGauge(getMetricNameForTaskDelay(tableNameWithType,
mergeLevel));
+ }
+ }
+ }
+
+ /**
+ * Reset the watermerk for the given table name and merge level
+ *
+ * @param tableNameWithType table name with type
+ * @param mergeLevel merge level
+ */
+ private void resetWatermarkMs(String tableNameWithType, String mergeLevel) {
+ ControllerMetrics controllerMetrics =
_clusterInfoAccessor.getControllerMetrics();
+ if (controllerMetrics == null) {
+ return;
+ }
+
+ // Delete all the watermarks associated with the given table name
Review comment:
updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]