jackhalfalltrades commented on code in PR #307:
URL: https://github.com/apache/atlas/pull/307#discussion_r1996037516


##########
webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java:
##########
@@ -454,63 +485,85 @@ void startInternal(Configuration configuration, 
ExecutorService executorService)
 
         if (!HAConfiguration.isHAEnabled(configuration)) {
             LOG.info("HA is disabled, starting consumers inline.");
+            startHookConsumers(executorService);
+        }
+    }
 
-            startConsumers(executorService);
+    public void startImportNotificationConsumer(NotificationType 
notificationType, String importId, String topic) {
+        if (topic != null) {
+            notificationInterface.addTopicToNotificationType(notificationType, 
topic);
         }
+        List<NotificationConsumer<HookNotification>> notificationConsumers = 
notificationInterface.createConsumers(notificationType, 1);
+        List<HookConsumer> hookConsumers = new ArrayList<>();
+        for (final NotificationConsumer<HookNotification> consumer : 
notificationConsumers) {
+            String hookConsumerName = ATLAS_IMPORT_CONSUMER_THREAD_PREFIX + 
importId;
+            HookConsumer hookConsumer = new HookConsumer(hookConsumerName, 
consumer);
+            hookConsumers.add(hookConsumer);
+        }
+        startConsumers(executors, hookConsumers);
     }
 
-    private void startConsumers(ExecutorService executorService) {
+    private void startHookConsumers(ExecutorService executorService) {
         int                                                           
numThreads                  = 
applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
         Map<NotificationConsumer<HookNotification>, NotificationType> 
notificationConsumersByType = new HashMap<>();
-
         List<NotificationConsumer<HookNotification>> notificationConsumers = 
notificationInterface.createConsumers(NotificationType.HOOK, numThreads);
-
         for (NotificationConsumer<HookNotification> notificationConsumer : 
notificationConsumers) {
             notificationConsumersByType.put(notificationConsumer, 
NotificationType.HOOK);
         }
-
         if (AtlasHook.isHookMsgsSortEnabled) {
             List<NotificationConsumer<HookNotification>> 
unsortedNotificationConsumers = 
notificationInterface.createConsumers(NotificationType.HOOK_UNSORTED, 
numThreads);
 
             for (NotificationConsumer<HookNotification> 
unsortedNotificationConsumer : unsortedNotificationConsumers) {
                 notificationConsumersByType.put(unsortedNotificationConsumer, 
NotificationType.HOOK_UNSORTED);
             }
         }
-
-        if (executorService == null) {
-            executorService = 
Executors.newFixedThreadPool(notificationConsumersByType.size(), new 
ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
-        }
-
-        executors = executorService;
-
+        List<HookConsumer> hookConsumers = new ArrayList<>();
         for (final NotificationConsumer<HookNotification> consumer : 
notificationConsumersByType.keySet()) {
             String hookConsumerName = ATLAS_HOOK_CONSUMER_THREAD_NAME;
-
             if 
(notificationConsumersByType.get(consumer).equals(NotificationType.HOOK_UNSORTED))
 {
                 hookConsumerName = ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME;
             }
-
             HookConsumer hookConsumer = new HookConsumer(hookConsumerName, 
consumer);
+            hookConsumers.add(hookConsumer);
+        }
+        startConsumers(executorService, hookConsumers);
+    }
 
-            consumers.add(hookConsumer);
-            executors.submit(hookConsumer);
+    private void startConsumers(ExecutorService executorService, 
List<HookConsumer> hookConsumers) {
+        if (consumers == null) {
+            consumers = new ArrayList<>();
+        }
+        if (executorService == null) {
+            executorService = new ThreadPoolExecutor(
+                    0, // Core pool size
+                    Integer.MAX_VALUE, // Maximum pool size (dynamic scaling)
+                    60L, TimeUnit.SECONDS, // Idle thread timeout
+                    new SynchronousQueue<>(), // Direct handoff queue
+                    new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX 
+ " thread-%d").build());
+        }
+        executors = executorService;

Review Comment:
   In this new approach the executors are overWritten in 
   startConsumers() - only when the passed in executorService is null 
(essentially at the start of the instance or start of the first async import 
request). Please correct me f I am wrong. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@atlas.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to