This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 95a8379c41417a5de3fb4bc828bb0efda06491d3
Author: Alex Rudyy <oru...@apache.org>
AuthorDate: Sun Dec 1 18:00:00 2019 +0000

    QPID-8387: [Broker-J] Handle exceptions thrown on asynchromous message 
removal in JDBC-based message stores
    
    (cherry picked from commit a597b9051f67287ab64cd7c1a966bfeab239088d)
---
 .../store/jdbc/AbstractJDBCMessageStore.java       | 75 ++++++++++++++++++----
 1 file changed, 62 insertions(+), 13 deletions(-)

diff --git 
a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
 
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index df02e55..e1181b2 100644
--- 
a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ 
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -38,6 +38,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -108,6 +109,7 @@ public abstract class AbstractJDBCMessageStore implements 
MessageStore
     private final Set<StoredJDBCMessage<?>> _messages = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
     private final Set<MessageDeleteListener> _messageDeleteListeners = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
     private final Set<Action<Connection>> _deleteActions = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+    private final Thread.UncaughtExceptionHandler _uncaughtExceptionHandler;
 
     protected abstract boolean isMessageStoreOpen();
 
@@ -117,6 +119,7 @@ public abstract class AbstractJDBCMessageStore implements 
MessageStore
 
     public AbstractJDBCMessageStore()
     {
+        _uncaughtExceptionHandler = 
Thread.getDefaultUncaughtExceptionHandler();
     }
 
     protected void setMaximumMessageId()
@@ -453,21 +456,67 @@ public abstract class AbstractJDBCMessageStore implements 
MessageStore
     {
         if(_messageRemovalScheduled.compareAndSet(false, true))
         {
-            _executor.submit(() -> {
-                List<Long> messageIds;
-                do
-                {
-                    messageIds = _messagesToDelete.getAndSet(EMPTY_LIST);
-                    removeMessages(messageIds);
-                } while(!messageIds.isEmpty());
-
+            try
+            {
+                _executor.submit(this::removeScheduledMessages);
+            }
+            catch (RejectedExecutionException e)
+            {
                 _messageRemovalScheduled.set(false);
-                if(!_messagesToDelete.get().isEmpty())
-                {
-                    scheduleMessageRemoval();
-                }
+                throw new IllegalStateException("Cannot schedule removal of 
messages", e);
+            }
+        }
+    }
 
-            });
+    private void removeScheduledMessages()
+    {
+        try
+        {
+            removeScheduledMessagesAndRescheduleIfRequired();
+        }
+        catch (RuntimeException e)
+        {
+            handleExceptionOnScheduledMessageRemoval(e);
+        }
+    }
+
+    private void removeScheduledMessagesAndRescheduleIfRequired()
+    {
+        List<Long> messageIds;
+        try
+        {
+            do
+            {
+                messageIds = _messagesToDelete.getAndSet(EMPTY_LIST);
+                removeMessages(messageIds);
+            } while (!messageIds.isEmpty());
+        }
+        finally
+        {
+            _messageRemovalScheduled.set(false);
+        }
+        if (!_messagesToDelete.get().isEmpty() && isMessageStoreOpen())
+        {
+            scheduleMessageRemoval();
+        }
+    }
+
+    private void handleExceptionOnScheduledMessageRemoval(final 
RuntimeException e)
+    {
+        if (isMessageStoreOpen())
+        {
+            if (_uncaughtExceptionHandler == null)
+            {
+                getLogger().error("Unexpected exception on asynchronous 
message removal", e);
+            }
+            else
+            {
+                
_uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
+            }
+        }
+        else
+        {
+            getLogger().warn("Ignoring unexpected exception on asynchronous 
message removal as store is not open", e);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to