This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit f1b9c83b4d54987c4729abdf308eea7b9deb1064
Author: Michael Blow <[email protected]>
AuthorDate: Tue Oct 29 17:39:54 2024 -0400

    [NO ISSUE] Misc fixes
    
    - Synchronize ActiveNotificationHandler.handle(), as it accesses members
      which require synchronization to be safe
    - Avoid spinning on interrupt in SingleThreadEventProcessor
    
    Ext-ref: MB-63390
    Change-Id: I89efb0662885dc8013eaf44ecb43e866186964ae
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19026
    Reviewed-by: Michael Blow <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
    Tested-by: Michael Blow <[email protected]>
---
 .../asterix/app/active/ActiveNotificationHandler.java   | 17 +++++++++++------
 .../hyracks/api/util/SingleThreadEventProcessor.java    | 12 ++++++++----
 2 files changed, 19 insertions(+), 10 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 0f2780f51e..b619cfec5a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActiveEvent.Kind;
@@ -68,23 +69,27 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
 
     @Override
     protected void handle(ActiveEvent event) {
+        resolveListenerForEvent(event).ifPresent(listener -> 
listener.notify(event));
+    }
+
+    private synchronized Optional<IActiveEntityEventsListener> 
resolveListenerForEvent(ActiveEvent event) {
         JobId jobId = event.getJobId();
         Kind eventKind = event.getEventKind();
         EntityId entityId = jobId2EntityId.get(jobId);
+        IActiveEntityEventsListener listener = null;
         if (entityId != null) {
-            IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
+            listener = entityEventListeners.get(entityId);
             if (eventKind == Kind.JOB_FINISHED) {
                 LOGGER.debug("removing ingestion job {}", jobId);
                 jobId2EntityId.remove(jobId);
             }
-            if (listener != null) {
-                listener.notify(event);
-            } else {
-                LOGGER.debug("listener not found for entity {} on event={}", 
entityId, event);
+            if (listener == null) {
+                LOGGER.debug("listener not found for entity {} on event {} for 
job {}", entityId, event, jobId);
             }
         } else {
-            LOGGER.log(Level.ERROR, "Entity not found for event {} for job 
{}", eventKind, jobId);
+            LOGGER.log(Level.ERROR, "entity not found for event {} for job 
{}", eventKind, jobId);
         }
+        return Optional.ofNullable(listener);
     }
 
     // *** IJobLifecycleListener
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
index 85019c57c5..b07cb712ed 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
@@ -31,7 +31,7 @@ public abstract class SingleThreadEventProcessor<T> 
implements Runnable {
     private static final Logger LOGGER = LogManager.getLogger();
     protected final String name;
     private final LinkedBlockingQueue<T> eventInbox;
-    private volatile Thread executorThread;
+    private final Thread executorThread;
     private volatile boolean stopped = false;
 
     public SingleThreadEventProcessor(String threadName) {
@@ -43,18 +43,22 @@ public abstract class SingleThreadEventProcessor<T> 
implements Runnable {
 
     @Override
     public final void run() {
-        LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName());
+        LOGGER.info("Started {}", name);
         while (!stopped) {
             try {
                 T event = eventInbox.take();
                 handle(event);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
+                if (!stopped) {
+                    LOGGER.warn("Interrupt while waiting for an event and 
!stopped");
+                }
+                break;
             } catch (Exception e) {
-                LOGGER.log(Level.ERROR, "Error handling an event", e);
+                LOGGER.error("Error handling an event", e);
             }
         }
-        LOGGER.log(Level.WARN, "Stopped " + Thread.currentThread().getName());
+        LOGGER.info("Stopped {}", name);
     }
 
     protected abstract void handle(T event) throws Exception; //NOSONAR

Reply via email to