mneethiraj commented on code in PR #390:
URL: https://github.com/apache/atlas/pull/390#discussion_r2165266011


##########
webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java:
##########
@@ -280,50 +270,30 @@ private void releaseAsyncImportSemaphore() {
     void populateRequestQueue() {
         LOG.info("==> populateRequestQueue()");
 
-        List<String> importRequests = 
asyncImportService.fetchQueuedImportRequests();
+        List<String> queuedImports = 
asyncImportService.fetchQueuedImportRequests();
+        List<String> inProgressImports = 
asyncImportService.fetchInProgressImportIds();
 
         try {
-            if (!importRequests.isEmpty()) {
-                for (String request : importRequests) {
-                    try {
-                        if (!requestQueue.offer(request, 5, TimeUnit.SECONDS)) 
{ // Wait up to 5 sec
-                            LOG.warn("populateRequestQueue(): Request {} could 
not be added to the queue", request);
-                        }
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-
-                        LOG.error("populateRequestQueue(): Failed to add 
requests to queue");
-
-                        break; // Exit loop on interruption
-                    }
-                }
-
-                LOG.info("populateRequestQueue(): Added {} requests to queue", 
importRequests.size());
-            } else {
+            if (queuedImports.isEmpty() && inProgressImports.isEmpty()) {
                 LOG.warn("populateRequestQueue(): No queued requests found.");
+                return;
             }

Review Comment:
   I suggest adding following log showing the number of requests loaded:
   ```
   LOG.info("loaded {} asynchronous import requests (in-progress={}, 
queued={})", (inProgressImports.size() + queuedImports.size()), 
inProgressImports.size(), queuedImports.size());
   ```



##########
webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java:
##########
@@ -153,7 +162,48 @@ public void onCompleteImportRequest(String importId) {
         }
     }
 
-    private void startNextImportInQueue() {
+    @PreDestroy
+    public void stopImport() {
+        LOG.info("Shutting down import processor...");
+
+        executorService.shutdown(); // Initiate an orderly shutdown
+
+        try {
+            if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
+                LOG.warn("Executor service did not terminate gracefully within 
the timeout. Waiting longer...");
+
+                // Retry shutdown before forcing it
+                if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+                    LOG.warn("Forcing shutdown...");
+
+                    executorService.shutdownNow();
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            LOG.error("Shutdown interrupted. Forcing shutdown...");
+
+            executorService.shutdownNow();
+        }
+
+        LOG.info("Import processor stopped.");
+    }
+
+    @VisibleForTesting
+    void startInternal() {
+        populateRequestQueue();
+
+        CompletableFuture.runAsync(this::startNextImportInQueue)

Review Comment:
   When  `requestQueue` is empty, it is not necessary to call 
`startNextImportInQueue()`. This will save unnecessary attempts to load the 
next request - in `getNextImportFromQueue()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to