dybyte commented on code in PR #9696:
URL: https://github.com/apache/seatunnel/pull/9696#discussion_r2273393678


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1078,4 +1092,57 @@ public ConnectorPackageService 
getConnectorPackageService() {
         }
         return connectorPackageService;
     }
+
+    private void startMetricsCleanupWorker() {
+        Runnable cleanupTask =
+                () -> {
+                    Thread.currentThread().setName("metrics-cleanup-runner");
+                    while (!Thread.currentThread().isInterrupted()
+                            && !metricsCleanupRetryQueue.isEmpty()) {
+                        try {
+                            PipelineLocation pipelineLocation =
+                                    metricsCleanupRetryQueue.poll(
+                                            cleanUpRetryInterval, 
TimeUnit.SECONDS);

Review Comment:
   We addressed this by using a `ScheduledExecutorService` with a fixed delay 
and also adding logic to check the last cleanup time before running the task. 
This ensures that even if a key is added to the queue immediately after the 
previous cleanup, the retry will not be executed right away but will respect 
the configured interval.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1078,4 +1092,57 @@ public ConnectorPackageService 
getConnectorPackageService() {
         }
         return connectorPackageService;
     }
+
+    private void startMetricsCleanupWorker() {
+        Runnable cleanupTask =
+                () -> {
+                    Thread.currentThread().setName("metrics-cleanup-runner");
+                    while (!Thread.currentThread().isInterrupted()
+                            && !metricsCleanupRetryQueue.isEmpty()) {
+                        try {
+                            PipelineLocation pipelineLocation =
+                                    metricsCleanupRetryQueue.poll(
+                                            cleanUpRetryInterval, 
TimeUnit.SECONDS);
+
+                            if (pipelineLocation != null) {
+                                JobMaster jobMaster = 
getJobMaster(pipelineLocation.getJobId());
+                                if (jobMaster != null) {
+                                    jobMaster.removeMetricsContext(
+                                            pipelineLocation,
+                                            (PipelineStatus)
+                                                    
runningJobStateIMap.get(pipelineLocation));
+                                } else {
+                                    
retryRemoveMetricsContext(pipelineLocation);
+                                }
+                            }
+
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            logger.info("Metrics cleanup worker interrupted, 
exiting...");
+                        } catch (Exception e) {
+                            logger.warning(
+                                    String.format(
+                                            "Metrics cleanup retry failed: 
%s", e.getMessage()));
+                        }
+                    }
+                };
+
+        executorService.submit(cleanupTask);
+    }
+
+    private void retryRemoveMetricsContext(PipelineLocation pipelineLocation) {
+        try {
+            metricsImap.compute(
+                    Constant.IMAP_RUNNING_JOB_METRICS_KEY,
+                    (key, centralMap) -> {
+                        
MetricsCleanupUtils.removeMetricsEntries(pipelineLocation, centralMap);
+                        return centralMap;
+                    });
+            logger.info(
+                    String.format(
+                            "Metrics cleanup via compute for pipeline: %s", 
pipelineLocation));
+        } catch (Exception e) {

Review Comment:
   Currently, failed cleanup tasks are added back to the retry queue. This 
might lead to an infinite retry loop in some cases. Do you think we should 
limit the number of retries? If so, what would be a reasonable limit? @Hisoka-X 



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -892,25 +906,18 @@ public void removeMetricsContext(
                                 Constant.IMAP_RUNNING_JOB_METRICS_KEY, 5, 
TimeUnit.SECONDS);
                 if (!lockedIMap) {
                     LOGGER.severe("lock imap failed in update metrics");
+                    boolean offer = 
metricsCleanupRetryQueue.offer(pipelineLocation);
+                    if (!offer) {
+                        LOGGER.warning("failed to add pipelineLocation to 
retry queue");
+                    }
                     return;
                 }
 
                 HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
                         metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
-                if (centralMap != null) {
-                    List<TaskLocation> collect =
-                            centralMap.keySet().stream()
-                                    .filter(
-                                            taskLocation -> {
-                                                return taskLocation
-                                                        .getTaskGroupLocation()
-                                                        .getPipelineLocation()
-                                                        
.equals(pipelineLocation);
-                                            })
-                                    .collect(Collectors.toList());
-                    collect.forEach(centralMap::remove);
-                    metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, 
centralMap);
-                }
+                MetricsCleanupUtils.removeMetricsEntries(pipelineLocation, 
centralMap);
+                metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, 
centralMap);

Review Comment:
   Done. I integrated `removeMetricsContext()` into the `MetricsCleanupUtil` 
class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to