This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 858a824 ARTEMIS-2659 / ARTEMIS-2673 Fix
AmqpSendReceiveTest.testAcceptWithoutSettling
new 7febd8c This closes #3071
858a824 is described below
commit 858a8240f97f7cd2fc3bca052ff74a97c0bf886c
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Apr 7 22:43:37 2020 -0400
ARTEMIS-2659 / ARTEMIS-2673 Fix
AmqpSendReceiveTest.testAcceptWithoutSettling
---
.../amqp/proton/ProtonServerSenderContext.java | 42 ++++++----------------
.../tests/integration/amqp/JMSSelectorTest.java | 2 +-
2 files changed, 11 insertions(+), 33 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index b9385bc..24d0bcb 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -629,7 +629,16 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
// this can happen in the twice ack mode, that is the receiver
accepts and settles separately
// acking again would show an exception but would have no negative
effect but best to handle anyway.
if (!delivery.isSettled()) {
- inSessionACK(delivery, message);
+ // we have to individual ack as we can't guarantee we will get
the delivery updates
+ // (including acks) in order from dealer, a performance hit but
a must
+ try {
+ sessionSPI.ack(null, brokerConsumer, message);
+ } catch (Exception e) {
+ log.warn(e.toString(), e);
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(),
e.getMessage());
+ }
+
+ delivery.settle();
}
} else {
handleExtendedDeliveryOutcomes(message, delivery, remoteState);
@@ -644,37 +653,6 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
}
}
- private void inSessionACK(Delivery delivery, Message message) throws
ActiveMQAMQPIllegalStateException {
- OperationContext oldContext = sessionSPI.recoverContext();
- try {
- // we have to individual ack as we can't guarantee we will get the
delivery updates
- // (including acks) in order from dealer, a performance hit but a must
- try {
- sessionSPI.ack(null, brokerConsumer, message);
- } catch (Exception e) {
- log.warn(e.toString(), e);
- throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(),
e.getMessage());
- }
-
- sessionSPI.afterIO(new IOCallback() {
- @Override
- public void done() {
- connection.runLater(() -> {
- delivery.settle();
- connection.instantFlush();
- });
- }
-
- @Override
- public void onError(int errorCode, String errorMessage) {
-
- }
- });
- } finally {
- sessionSPI.resetContext(oldContext);
- }
- }
-
private boolean handleExtendedDeliveryOutcomes(Message message, Delivery
delivery, DeliveryState remoteState) throws ActiveMQAMQPException {
boolean settleImmediate = true;
boolean handled = true;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java
index c61898f..56c83d2 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java
@@ -131,7 +131,7 @@ public class JMSSelectorTest extends JMSClientTestSupport {
TextMessage msg = (TextMessage) consumer.receive(1000);
assertNotNull(msg);
assertEquals("how are you", msg.getText());
- assertNull(consumer.receive(1000));
+ assertNull(consumer.receiveNoWait());
consumer.close();
}
}