mneethiraj commented on code in PR #390:
URL: https://github.com/apache/atlas/pull/390#discussion_r2165266011
##########
webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java:
##########
@@ -280,50 +270,30 @@ private void releaseAsyncImportSemaphore() {
void populateRequestQueue() {
LOG.info("==> populateRequestQueue()");
- List<String> importRequests =
asyncImportService.fetchQueuedImportRequests();
+ List<String> queuedImports =
asyncImportService.fetchQueuedImportRequests();
+ List<String> inProgressImports =
asyncImportService.fetchInProgressImportIds();
try {
- if (!importRequests.isEmpty()) {
- for (String request : importRequests) {
- try {
- if (!requestQueue.offer(request, 5, TimeUnit.SECONDS))
{ // Wait up to 5 sec
- LOG.warn("populateRequestQueue(): Request {} could
not be added to the queue", request);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- LOG.error("populateRequestQueue(): Failed to add
requests to queue");
-
- break; // Exit loop on interruption
- }
- }
-
- LOG.info("populateRequestQueue(): Added {} requests to queue",
importRequests.size());
- } else {
+ if (queuedImports.isEmpty() && inProgressImports.isEmpty()) {
LOG.warn("populateRequestQueue(): No queued requests found.");
+ return;
}
Review Comment:
I suggest adding following log showing the number of requests loaded:
```
LOG.info("loaded {} asynchronous import requests (in-progress={},
queued={})", (inProgressImports.size() + queuedImports.size()),
inProgressImports.size(), queuedImports.size());
```
##########
webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java:
##########
@@ -153,7 +162,48 @@ public void onCompleteImportRequest(String importId) {
}
}
- private void startNextImportInQueue() {
+ @PreDestroy
+ public void stopImport() {
+ LOG.info("Shutting down import processor...");
+
+ executorService.shutdown(); // Initiate an orderly shutdown
+
+ try {
+ if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
+ LOG.warn("Executor service did not terminate gracefully within
the timeout. Waiting longer...");
+
+ // Retry shutdown before forcing it
+ if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOG.warn("Forcing shutdown...");
+
+ executorService.shutdownNow();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ LOG.error("Shutdown interrupted. Forcing shutdown...");
+
+ executorService.shutdownNow();
+ }
+
+ LOG.info("Import processor stopped.");
+ }
+
+ @VisibleForTesting
+ void startInternal() {
+ populateRequestQueue();
+
+ CompletableFuture.runAsync(this::startNextImportInQueue)
Review Comment:
When `requestQueue` is empty, it is not necessary to call
`startNextImportInQueue()`. This will save unnecessary attempts to load the
next request - in `getNextImportFromQueue()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]