Repository: activemq-artemis Updated Branches: refs/heads/master 2f52e3ce2 -> 036e24460
ARTEMIS-2159 Fixing OpenWire Blocker Producer Previous change on Flow control in OpenWire broke Blocked cases This is a better fix. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c6214680 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c6214680 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c6214680 Branch: refs/heads/master Commit: c62146802ed1a8a3128e1e3b847a02eac247aa01 Parents: 2f52e3c Author: Clebert Suconic <[email protected]> Authored: Thu Nov 1 15:33:03 2018 -0400 Committer: Clebert Suconic <[email protected]> Committed: Thu Nov 1 15:43:17 2018 -0400 ---------------------------------------------------------------------- .../core/protocol/openwire/amq/AMQSession.java | 100 +++++++++---------- .../artemis/core/paging/PagingStore.java | 2 + .../core/paging/impl/PagingStoreImpl.java | 8 +- .../storage/PersistMultiThreadTest.java | 5 + 4 files changed, 63 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6214680/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index a107ba7..0429297 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -443,63 +443,63 @@ public class AMQSession implements SessionCallback { final AtomicInteger count, final org.apache.activemq.artemis.api.core.Message coreMsg, final SimpleString address) throws ResourceAllocationException { - if (!store.checkMemory(null)) { - this.connection.getContext().setDontSendReponse(false); - connection.enableTtl(); - throw new ResourceAllocationException("Queue is full " + address); - } + if (!store.checkMemory(false, () -> { + Exception exceptionToSend = null; - Exception exceptionToSend = null; - - try { - getCoreSession().send(coreMsg, false, dest.isTemporary()); - } catch (Exception e) { - logger.warn(e.getMessage(), e); - exceptionToSend = e; - } - connection.enableTtl(); - if (count == null || count.decrementAndGet() == 0) { - if (exceptionToSend != null) { - this.connection.getContext().setDontSendReponse(false); - connection.sendException(exceptionToSend); - } else { - server.getStorageManager().afterCompleteOperations(new IOCallback() { - @Override - public void done() { - if (sendProducerAck) { - try { - ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); - connection.dispatchAsync(ack); - } catch (Exception e) { + try { + getCoreSession().send(coreMsg, false, dest.isTemporary()); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + exceptionToSend = e; + } + connection.enableTtl(); + if (count == null || count.decrementAndGet() == 0) { + if (exceptionToSend != null) { + this.connection.getContext().setDontSendReponse(false); + connection.sendException(exceptionToSend); + } else { + server.getStorageManager().afterCompleteOperations(new IOCallback() { + @Override + public void done() { + if (sendProducerAck) { + try { + ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); + connection.dispatchAsync(ack); + } catch (Exception e) { + connection.getContext().setDontSendReponse(false); + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + connection.sendException(e); + } + } else { connection.getContext().setDontSendReponse(false); - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); - connection.sendException(e); - } - } else { - connection.getContext().setDontSendReponse(false); - try { - Response response = new Response(); - response.setCorrelationId(messageSend.getCommandId()); - connection.dispatchAsync(response); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); - connection.sendException(e); + try { + Response response = new Response(); + response.setCorrelationId(messageSend.getCommandId()); + connection.dispatchAsync(response); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + connection.sendException(e); + } } } - } - @Override - public void onError(int errorCode, String errorMessage) { - try { - final IOException e = new IOException(errorMessage); - ActiveMQServerLogger.LOGGER.warn(errorMessage); - connection.serviceException(e); - } catch (Exception ex) { - ActiveMQServerLogger.LOGGER.debug(ex); + @Override + public void onError(int errorCode, String errorMessage) { + try { + final IOException e = new IOException(errorMessage); + ActiveMQServerLogger.LOGGER.warn(errorMessage); + connection.serviceException(e); + } catch (Exception ex) { + ActiveMQServerLogger.LOGGER.debug(ex); + } } - } - }); + }); + } } + })) { + this.connection.getContext().setDontSendReponse(false); + connection.enableTtl(); + throw new ResourceAllocationException("Queue is full " + address); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6214680/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index 38c59dc..5a290ba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -127,6 +127,8 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener boolean checkMemory(Runnable runnable); + boolean checkMemory(boolean runOnFailure, Runnable runnable); + boolean isFull(); boolean isRejectingMessages(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6214680/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 908ab9f..89cd5b6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -652,13 +652,17 @@ public class PagingStoreImpl implements PagingStore { } } - @Override public boolean checkMemory(final Runnable runWhenAvailable) { + return checkMemory(true, runWhenAvailable); + } + + @Override + public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) { if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) { if (isFull()) { - if (runWhenAvailable != null) { + if (runOnFailure && runWhenAvailable != null) { onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable)); } return false; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6214680/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java ---------------------------------------------------------------------- diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index 813765d..420c278 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -411,6 +411,11 @@ public class PersistMultiThreadTest extends ActiveMQTestBase { } @Override + public boolean checkMemory(boolean runOnFailure, Runnable runnable) { + return false; + } + + @Override public boolean checkMemory(Runnable runnable) { return false; }
