This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch 7.1.x in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 95a8379c41417a5de3fb4bc828bb0efda06491d3 Author: Alex Rudyy <oru...@apache.org> AuthorDate: Sun Dec 1 18:00:00 2019 +0000 QPID-8387: [Broker-J] Handle exceptions thrown on asynchromous message removal in JDBC-based message stores (cherry picked from commit a597b9051f67287ab64cd7c1a966bfeab239088d) --- .../store/jdbc/AbstractJDBCMessageStore.java | 75 ++++++++++++++++++---- 1 file changed, 62 insertions(+), 13 deletions(-) diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java index df02e55..e1181b2 100644 --- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java +++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java @@ -38,6 +38,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; @@ -108,6 +109,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore private final Set<StoredJDBCMessage<?>> _messages = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set<MessageDeleteListener> _messageDeleteListeners = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set<Action<Connection>> _deleteActions = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Thread.UncaughtExceptionHandler _uncaughtExceptionHandler; protected abstract boolean isMessageStoreOpen(); @@ -117,6 +119,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore public AbstractJDBCMessageStore() { + _uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler(); } protected void setMaximumMessageId() @@ -453,21 +456,67 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { if(_messageRemovalScheduled.compareAndSet(false, true)) { - _executor.submit(() -> { - List<Long> messageIds; - do - { - messageIds = _messagesToDelete.getAndSet(EMPTY_LIST); - removeMessages(messageIds); - } while(!messageIds.isEmpty()); - + try + { + _executor.submit(this::removeScheduledMessages); + } + catch (RejectedExecutionException e) + { _messageRemovalScheduled.set(false); - if(!_messagesToDelete.get().isEmpty()) - { - scheduleMessageRemoval(); - } + throw new IllegalStateException("Cannot schedule removal of messages", e); + } + } + } - }); + private void removeScheduledMessages() + { + try + { + removeScheduledMessagesAndRescheduleIfRequired(); + } + catch (RuntimeException e) + { + handleExceptionOnScheduledMessageRemoval(e); + } + } + + private void removeScheduledMessagesAndRescheduleIfRequired() + { + List<Long> messageIds; + try + { + do + { + messageIds = _messagesToDelete.getAndSet(EMPTY_LIST); + removeMessages(messageIds); + } while (!messageIds.isEmpty()); + } + finally + { + _messageRemovalScheduled.set(false); + } + if (!_messagesToDelete.get().isEmpty() && isMessageStoreOpen()) + { + scheduleMessageRemoval(); + } + } + + private void handleExceptionOnScheduledMessageRemoval(final RuntimeException e) + { + if (isMessageStoreOpen()) + { + if (_uncaughtExceptionHandler == null) + { + getLogger().error("Unexpected exception on asynchronous message removal", e); + } + else + { + _uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e); + } + } + else + { + getLogger().warn("Ignoring unexpected exception on asynchronous message removal as store is not open", e); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org