jtao15 commented on a change in pull request #7368:
URL: https://github.com/apache/pinot/pull/7368#discussion_r719788856
##########
File path:
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -463,4 +491,134 @@ private long getWatermarkMs(long minStartTimeMs, long
bucketMs, String mergeLeve
return pinotTaskConfigs;
}
+
+ private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long
bufferTimeMs, long bucketTimeMs) {
+ if (watermarkMs == -1) {
+ return 0;
+ }
+ return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) /
bucketTimeMs;
+ }
+
+ private void setWatermarkMs(String tableNameWithType, String mergeLevel,
long watermarkMs, long bufferTimeMs,
+ long bucketTimeMs) {
+ LOGGER.info("Creating the gauge metric for tracking the merge/roll-up task
delay for table: {} and mergeLevel: {}."
Review comment:
Move this line to `519`? We are creating the metric only when this
function is called the first time for a give mergeLevel.
##########
File path:
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -463,4 +491,134 @@ private long getWatermarkMs(long minStartTimeMs, long
bucketMs, String mergeLeve
return pinotTaskConfigs;
}
+
+ private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long
bufferTimeMs, long bucketTimeMs) {
+ if (watermarkMs == -1) {
+ return 0;
+ }
+ return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) /
bucketTimeMs;
+ }
+
+ private void setWatermarkMs(String tableNameWithType, String mergeLevel,
long watermarkMs, long bufferTimeMs,
Review comment:
Better naming? This function updates the watermark map for emitting
metrics purposes, we also set the watermark in zk metadata outside of this
function. We can either put
`mergeRollupTaskMetadata.getWatermarkMap().put(mergeLevel, windowStartMs);`
inside this function or rename this function.
##########
File path:
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -90,15 +92,29 @@
*/
@TaskGenerator
public class MergeRollupTaskGenerator implements PinotTaskGenerator {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MergeRollupTaskGenerator.class);
private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;
private static final String REFRESH = "REFRESH";
- private static final Logger LOGGER =
LoggerFactory.getLogger(MergeRollupTaskGenerator.class);
+ // This is the metric that keeps track of the task delay in the number of
time buckets. For example, if we see this
Review comment:
Thanks for the detailed comments of the metric.
##########
File path:
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -463,4 +492,134 @@ private long getWatermarkMs(long minStartTimeMs, long
bucketMs, String mergeLeve
return pinotTaskConfigs;
}
+
+ private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long
bufferTimeMs, long bucketTimeMs) {
+ if (watermarkMs == -1) {
+ return 0;
+ }
+ return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) /
bucketTimeMs;
+ }
+
+ private void setWatermarkMs(String tableNameWithType, String mergeLevel,
long watermarkMs, long bufferTimeMs,
+ long bucketTimeMs) {
+ LOGGER.info("Creating the gauge metric for tracking the merge/roll-up task
delay for table: {} and mergeLevel: {}."
+ + "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={},
taskDelayInNumTimeBuckets={})", tableNameWithType,
+ mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs,
+ getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs,
bucketTimeMs));
+
+ ControllerMetrics controllerMetrics =
_clusterInfoAccessor.getControllerMetrics();
+ if (controllerMetrics == null) {
+ return;
+ }
+
+ // Update gauge value that indicates the delay in terms of the number of
time buckets.
+ Map<String, Long> watermarkForTable =
+ _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new
ConcurrentHashMap<>());
+ watermarkForTable.compute(mergeLevel, (k, v) -> {
+ if (v == null) {
+
controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType,
mergeLevel),
+ (() ->
getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L),
bufferTimeMs,
+ bucketTimeMs)));
+ }
+ return watermarkMs;
+ });
+ }
+
+ /**
+ * Reset the watermark for the given table name
+ * @param tableNameWithType a table name with type
+ */
+ private void resetWatermarkMs(String tableNameWithType) {
+ ControllerMetrics controllerMetrics =
_clusterInfoAccessor.getControllerMetrics();
+ if (controllerMetrics == null) {
+ return;
+ }
+
+ // Delete all the watermarks associated with the given table name
+ Map<String, Long> watermarksForTable =
_mergeRollupWatermarks.remove(tableNameWithType);
+ if (watermarksForTable != null) {
+ for (String mergeLevel : watermarksForTable.keySet()) {
+
controllerMetrics.removeGauge(getMetricNameForTaskDelay(tableNameWithType,
mergeLevel));
+ }
+ }
+ }
+
+ /**
+ * Reset the watermerk for the given table name and merge level
+ *
+ * @param tableNameWithType table name with type
+ * @param mergeLevel merge level
+ */
+ private void resetWatermarkMs(String tableNameWithType, String mergeLevel) {
+ ControllerMetrics controllerMetrics =
_clusterInfoAccessor.getControllerMetrics();
+ if (controllerMetrics == null) {
+ return;
+ }
+
+ // Delete all the watermarks associated with the given table name
Review comment:
(nit) // Delete the watermark associated with the merge level of given
table name
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]