Github user clebertsuconic commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/907#discussion_r93466239
--- Diff:
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
---
@@ -303,108 +301,103 @@ public void send(final ProducerInfo producerInfo,
originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(),
messageSend.getMessageId().toString());
}
- Runnable runnable;
-
- if (sendProducerAck) {
- runnable = new Runnable() {
- @Override
- public void run() {
- try {
- ProducerAck ack = new
ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
- connection.dispatchSync(ack);
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
- connection.sendException(e);
- }
-
- }
- };
- } else {
- final Connection transportConnection =
connection.getTransportConnection();
-
- if (transportConnection == null) {
- // I don't think this could happen, but just in case, avoiding
races
- runnable = null;
- } else {
- runnable = new Runnable() {
- @Override
- public void run() {
- transportConnection.setAutoRead(true);
- }
- };
- }
- }
-
- internalSend(actualDestinations, originalCoreMsg, runnable);
- }
+ boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 ||
messageSend.isResponseRequired();
- private void internalSend(ActiveMQDestination[] actualDestinations,
- ServerMessage originalCoreMsg,
- final Runnable onComplete) throws Exception {
+ final AtomicInteger count = new
AtomicInteger(actualDestinations.length);
- Runnable runToUse;
+ final Exception[] anyException = new Exception[] {null};
- if (actualDestinations.length <= 1 || onComplete == null) {
- // if onComplete is null, this will be null ;)
- runToUse = onComplete;
- } else {
- final AtomicInteger count = new
AtomicInteger(actualDestinations.length);
- runToUse = new Runnable() {
- @Override
- public void run() {
- if (count.decrementAndGet() == 0) {
- onComplete.run();
- }
- }
- };
+ if (shouldBlockProducer) {
+ connection.getContext().setDontSendReponse(true);
}
- SimpleString[] addresses = new
SimpleString[actualDestinations.length];
- PagingStore[] pagingStores = new
PagingStore[actualDestinations.length];
-
- // We fillup addresses, pagingStores and we will throw failure if
that's the case
for (int i = 0; i < actualDestinations.length; i++) {
ActiveMQDestination dest = actualDestinations[i];
- addresses[i] = new SimpleString(dest.getPhysicalName());
- pagingStores[i] =
server.getPagingManager().getPageStore(addresses[i]);
- if (pagingStores[i].getAddressFullMessagePolicy() ==
AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
- throw new ResourceAllocationException("Queue is full");
- }
- }
-
- for (int i = 0; i < actualDestinations.length; i++) {
-
+ SimpleString address = new SimpleString(dest.getPhysicalName());
ServerMessage coreMsg = originalCoreMsg.copy();
-
- coreMsg.setAddress(addresses[i]);
-
- PagingStore store = pagingStores[i];
-
- if (store.isFull()) {
- connection.getTransportConnection().setAutoRead(false);
- }
+ coreMsg.setAddress(address);
if (actualDestinations[i].isQueue()) {
checkAutoCreateQueue(new
SimpleString(actualDestinations[i].getPhysicalName()),
actualDestinations[i].isTemporary());
- }
-
- if (actualDestinations[i].isQueue()) {
coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE,
RoutingType.ANYCAST.getType());
} else {
coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE,
RoutingType.MULTICAST.getType());
}
- RoutingStatus result = getCoreSession().send(coreMsg, false,
actualDestinations[i].isTemporary());
+ PagingStore store =
server.getPagingManager().getPageStore(address);
- if (result == RoutingStatus.NO_BINDINGS &&
actualDestinations[i].isQueue()) {
- throw new InvalidDestinationException("Cannot publish to a
non-existent Destination: " + actualDestinations[i]);
- }
- if (runToUse != null) {
- // if the timeout is >0, it will wait this much milliseconds
- // before running the the runToUse
- // this will eventually unblock blocked destinations
- // playing flow control
- store.checkMemory(runToUse);
+ this.connection.disableTtl();
+ if (shouldBlockProducer) {
+ if (!store.checkMemory(() -> {
+ try {
+ RoutingStatus result = getCoreSession().send(coreMsg,
false, dest.isTemporary());
+
+ if (result == RoutingStatus.NO_BINDINGS &&
dest.isQueue()) {
+ throw new InvalidDestinationException("Cannot publish
to a non-existent Destination: " + dest);
+ }
+ } catch (Exception e) {
+ if (anyException[0] == null) {
+ anyException[0] = e;
--- End diff --
remove these..
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---