Repository: hadoop Updated Branches: refs/heads/trunk 2eb597b15 -> f6bb1ca3c
HDDS-513. Check if the EventQueue is not closed before executing handlers. Contributed by Nanda Kumar. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f6bb1ca3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f6bb1ca3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f6bb1ca3 Branch: refs/heads/trunk Commit: f6bb1ca3c16eb5825188b51d45f32be111266d88 Parents: 2eb597b Author: Bharat Viswanadham <[email protected]> Authored: Wed Sep 19 14:35:29 2018 -0700 Committer: Bharat Viswanadham <[email protected]> Committed: Wed Sep 19 14:35:29 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hdds/server/events/EventQueue.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6bb1ca3/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java index b2b0df2..9aeab7b 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java @@ -55,6 +55,8 @@ public class EventQueue implements EventPublisher, AutoCloseable { private final AtomicLong eventCount = new AtomicLong(0); + private boolean isRunning = true; + public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler( EVENT_TYPE event, EventHandler<PAYLOAD> handler) { this.addHandler(event, handler, generateHandlerName(handler)); @@ -116,6 +118,10 @@ public class EventQueue implements EventPublisher, AutoCloseable { public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler( EVENT_TYPE event, EventExecutor<PAYLOAD> executor, EventHandler<PAYLOAD> handler) { + if (!isRunning) { + LOG.warn("Not adding handler for {}, EventQueue is not running", event); + return; + } validateEvent(event); executors.putIfAbsent(event, new HashMap<>()); executors.get(event).putIfAbsent(executor, new ArrayList<>()); @@ -136,6 +142,11 @@ public class EventQueue implements EventPublisher, AutoCloseable { public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent( EVENT_TYPE event, PAYLOAD payload) { + if (!isRunning) { + LOG.warn("Processing of {} is skipped, EventQueue is not running", event); + return; + } + Map<EventExecutor, List<EventHandler>> eventExecutorListMap = this.executors.get(event); @@ -187,6 +198,11 @@ public class EventQueue implements EventPublisher, AutoCloseable { long currentTime = Time.now(); while (true) { + if (!isRunning) { + LOG.warn("Processing of event skipped. EventQueue is not running"); + return; + } + long processed = 0; Stream<EventExecutor> allExecutor = this.executors.values().stream() @@ -216,6 +232,8 @@ public class EventQueue implements EventPublisher, AutoCloseable { public void close() { + isRunning = false; + Set<EventExecutor> allExecutors = this.executors.values().stream() .flatMap(handlerMap -> handlerMap.keySet().stream()) .collect(Collectors.toSet()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
