[
https://issues.apache.org/jira/browse/ARTEMIS-883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15743593#comment-15743593
]
ASF GitHub Bot commented on ARTEMIS-883:
----------------------------------------
Github user gaohoward commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/907#discussion_r92070977
--- Diff:
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
---
@@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(),
messageSend.getMessageId().toString());
}
- Runnable runnable;
-
- if (sendProducerAck) {
- runnable = new Runnable() {
- @Override
- public void run() {
- try {
- ProducerAck ack = new
ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
- connection.dispatchSync(ack);
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
- connection.sendException(e);
- }
-
- }
- };
- } else {
- final Connection transportConnection =
connection.getTransportConnection();
-
- if (transportConnection == null) {
- // I don't think this could happen, but just in case, avoiding
races
- runnable = null;
- } else {
- runnable = new Runnable() {
- @Override
- public void run() {
- transportConnection.setAutoRead(true);
- }
- };
- }
- }
-
- internalSend(actualDestinations, originalCoreMsg, runnable);
- }
+ boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 ||
messageSend.isResponseRequired();
- private void internalSend(ActiveMQDestination[] actualDestinations,
- ServerMessage originalCoreMsg,
- final Runnable onComplete) throws Exception {
+ final AtomicInteger count = new
AtomicInteger(actualDestinations.length);
- Runnable runToUse;
+ final Exception[] anyException = new Exception[] {null};
- if (actualDestinations.length <= 1 || onComplete == null) {
- // if onComplete is null, this will be null ;)
- runToUse = onComplete;
- } else {
- final AtomicInteger count = new
AtomicInteger(actualDestinations.length);
- runToUse = new Runnable() {
- @Override
- public void run() {
- if (count.decrementAndGet() == 0) {
- onComplete.run();
- }
- }
- };
+ if (shouldBlockProducer) {
+ connection.getContext().setDontSendReponse(true);
}
- SimpleString[] addresses = new
SimpleString[actualDestinations.length];
- PagingStore[] pagingStores = new
PagingStore[actualDestinations.length];
-
- // We fillup addresses, pagingStores and we will throw failure if
that's the case
for (int i = 0; i < actualDestinations.length; i++) {
ActiveMQDestination dest = actualDestinations[i];
- addresses[i] = new SimpleString(dest.getPhysicalName());
- pagingStores[i] =
server.getPagingManager().getPageStore(addresses[i]);
- if (pagingStores[i].getAddressFullMessagePolicy() ==
AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
- throw new ResourceAllocationException("Queue is full");
- }
- }
-
- for (int i = 0; i < actualDestinations.length; i++) {
+ SimpleString address = new SimpleString(dest.getPhysicalName());
+ PagingStore store =
server.getPagingManager().getPageStore(address);
ServerMessage coreMsg = originalCoreMsg.copy();
+ coreMsg.setAddress(address);
- coreMsg.setAddress(addresses[i]);
-
- PagingStore store = pagingStores[i];
-
- if (store.isFull()) {
- connection.getTransportConnection().setAutoRead(false);
- }
+ if (shouldBlockProducer) {
- if (actualDestinations[i].isQueue()) {
- checkAutoCreateQueue(new
SimpleString(actualDestinations[i].getPhysicalName()),
actualDestinations[i].isTemporary());
- }
+ if (!store.checkMemory(() -> {
+ try {
+ RoutingStatus result = getCoreSession().send(coreMsg,
false, dest.isTemporary());
- if (actualDestinations[i].isQueue()) {
-
coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE,
RoutingType.ANYCAST.getType());
+ if (result == RoutingStatus.NO_BINDINGS &&
dest.isQueue()) {
+ throw new InvalidDestinationException("Cannot publish
to a non-existent Destination: " + dest);
--- End diff --
I don't quite understand this, the client should in this case either get
the exception immediately (callback is called on the calling thread directly),
or blocking to wait for a response (callback is called on a possible different
thread).
> Fix OpenWire ProducerFlowControlTest Regression
> -----------------------------------------------
>
> Key: ARTEMIS-883
> URL: https://issues.apache.org/jira/browse/ARTEMIS-883
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Components: OpenWire
> Affects Versions: 1.5.1
> Reporter: Howard Gao
> Assignee: Howard Gao
> Fix For: 1.5.next
>
>
> ProducerFlowControlTest fails due to regression
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)