This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit f1b9c83b4d54987c4729abdf308eea7b9deb1064 Author: Michael Blow <[email protected]> AuthorDate: Tue Oct 29 17:39:54 2024 -0400 [NO ISSUE] Misc fixes - Synchronize ActiveNotificationHandler.handle(), as it accesses members which require synchronization to be safe - Avoid spinning on interrupt in SingleThreadEventProcessor Ext-ref: MB-63390 Change-Id: I89efb0662885dc8013eaf44ecb43e866186964ae Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19026 Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Michael Blow <[email protected]> --- .../asterix/app/active/ActiveNotificationHandler.java | 17 +++++++++++------ .../hyracks/api/util/SingleThreadEventProcessor.java | 12 ++++++++---- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java index 0f2780f51e..b619cfec5a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.asterix.active.ActiveEvent; import org.apache.asterix.active.ActiveEvent.Kind; @@ -68,23 +69,27 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active @Override protected void handle(ActiveEvent event) { + resolveListenerForEvent(event).ifPresent(listener -> listener.notify(event)); + } + + private synchronized Optional<IActiveEntityEventsListener> resolveListenerForEvent(ActiveEvent event) { JobId jobId = event.getJobId(); Kind eventKind = event.getEventKind(); EntityId entityId = jobId2EntityId.get(jobId); + IActiveEntityEventsListener listener = null; if (entityId != null) { - IActiveEntityEventsListener listener = entityEventListeners.get(entityId); + listener = entityEventListeners.get(entityId); if (eventKind == Kind.JOB_FINISHED) { LOGGER.debug("removing ingestion job {}", jobId); jobId2EntityId.remove(jobId); } - if (listener != null) { - listener.notify(event); - } else { - LOGGER.debug("listener not found for entity {} on event={}", entityId, event); + if (listener == null) { + LOGGER.debug("listener not found for entity {} on event {} for job {}", entityId, event, jobId); } } else { - LOGGER.log(Level.ERROR, "Entity not found for event {} for job {}", eventKind, jobId); + LOGGER.log(Level.ERROR, "entity not found for event {} for job {}", eventKind, jobId); } + return Optional.ofNullable(listener); } // *** IJobLifecycleListener diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java index 85019c57c5..b07cb712ed 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java @@ -31,7 +31,7 @@ public abstract class SingleThreadEventProcessor<T> implements Runnable { private static final Logger LOGGER = LogManager.getLogger(); protected final String name; private final LinkedBlockingQueue<T> eventInbox; - private volatile Thread executorThread; + private final Thread executorThread; private volatile boolean stopped = false; public SingleThreadEventProcessor(String threadName) { @@ -43,18 +43,22 @@ public abstract class SingleThreadEventProcessor<T> implements Runnable { @Override public final void run() { - LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName()); + LOGGER.info("Started {}", name); while (!stopped) { try { T event = eventInbox.take(); handle(event); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + if (!stopped) { + LOGGER.warn("Interrupt while waiting for an event and !stopped"); + } + break; } catch (Exception e) { - LOGGER.log(Level.ERROR, "Error handling an event", e); + LOGGER.error("Error handling an event", e); } } - LOGGER.log(Level.WARN, "Stopped " + Thread.currentThread().getName()); + LOGGER.info("Stopped {}", name); } protected abstract void handle(T event) throws Exception; //NOSONAR
