dybyte commented on code in PR #9696:
URL: https://github.com/apache/seatunnel/pull/9696#discussion_r2273393678
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1078,4 +1092,57 @@ public ConnectorPackageService
getConnectorPackageService() {
}
return connectorPackageService;
}
+
+ private void startMetricsCleanupWorker() {
+ Runnable cleanupTask =
+ () -> {
+ Thread.currentThread().setName("metrics-cleanup-runner");
+ while (!Thread.currentThread().isInterrupted()
+ && !metricsCleanupRetryQueue.isEmpty()) {
+ try {
+ PipelineLocation pipelineLocation =
+ metricsCleanupRetryQueue.poll(
+ cleanUpRetryInterval,
TimeUnit.SECONDS);
Review Comment:
We addressed this by using a `ScheduledExecutorService` with a fixed delay
and also adding logic to check the last cleanup time before running the task.
This ensures that even if a key is added to the queue immediately after the
previous cleanup, the retry will not be executed right away but will respect
the configured interval.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1078,4 +1092,57 @@ public ConnectorPackageService
getConnectorPackageService() {
}
return connectorPackageService;
}
+
+ private void startMetricsCleanupWorker() {
+ Runnable cleanupTask =
+ () -> {
+ Thread.currentThread().setName("metrics-cleanup-runner");
+ while (!Thread.currentThread().isInterrupted()
+ && !metricsCleanupRetryQueue.isEmpty()) {
+ try {
+ PipelineLocation pipelineLocation =
+ metricsCleanupRetryQueue.poll(
+ cleanUpRetryInterval,
TimeUnit.SECONDS);
+
+ if (pipelineLocation != null) {
+ JobMaster jobMaster =
getJobMaster(pipelineLocation.getJobId());
+ if (jobMaster != null) {
+ jobMaster.removeMetricsContext(
+ pipelineLocation,
+ (PipelineStatus)
+
runningJobStateIMap.get(pipelineLocation));
+ } else {
+
retryRemoveMetricsContext(pipelineLocation);
+ }
+ }
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.info("Metrics cleanup worker interrupted,
exiting...");
+ } catch (Exception e) {
+ logger.warning(
+ String.format(
+ "Metrics cleanup retry failed:
%s", e.getMessage()));
+ }
+ }
+ };
+
+ executorService.submit(cleanupTask);
+ }
+
+ private void retryRemoveMetricsContext(PipelineLocation pipelineLocation) {
+ try {
+ metricsImap.compute(
+ Constant.IMAP_RUNNING_JOB_METRICS_KEY,
+ (key, centralMap) -> {
+
MetricsCleanupUtils.removeMetricsEntries(pipelineLocation, centralMap);
+ return centralMap;
+ });
+ logger.info(
+ String.format(
+ "Metrics cleanup via compute for pipeline: %s",
pipelineLocation));
+ } catch (Exception e) {
Review Comment:
Currently, failed cleanup tasks are added back to the retry queue. This
might lead to an infinite retry loop in some cases. Do you think we should
limit the number of retries? If so, what would be a reasonable limit? @Hisoka-X
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -892,25 +906,18 @@ public void removeMetricsContext(
Constant.IMAP_RUNNING_JOB_METRICS_KEY, 5,
TimeUnit.SECONDS);
if (!lockedIMap) {
LOGGER.severe("lock imap failed in update metrics");
+ boolean offer =
metricsCleanupRetryQueue.offer(pipelineLocation);
+ if (!offer) {
+ LOGGER.warning("failed to add pipelineLocation to
retry queue");
+ }
return;
}
HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
- if (centralMap != null) {
- List<TaskLocation> collect =
- centralMap.keySet().stream()
- .filter(
- taskLocation -> {
- return taskLocation
- .getTaskGroupLocation()
- .getPipelineLocation()
-
.equals(pipelineLocation);
- })
- .collect(Collectors.toList());
- collect.forEach(centralMap::remove);
- metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY,
centralMap);
- }
+ MetricsCleanupUtils.removeMetricsEntries(pipelineLocation,
centralMap);
+ metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY,
centralMap);
Review Comment:
Done. I integrated `removeMetricsContext()` into the `MetricsCleanupUtil`
class
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]