[
https://issues.apache.org/jira/browse/AMQ-8322?focusedWorklogId=793249&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-793249
]
ASF GitHub Bot logged work on AMQ-8322:
---------------------------------------
Author: ASF GitHub Bot
Created on: 20/Jul/22 13:37
Start Date: 20/Jul/22 13:37
Worklog Time Spent: 10m
Work Description: mattrpav commented on code in PR #729:
URL: https://github.com/apache/activemq/pull/729#discussion_r925618032
##########
activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AckModesTest.java:
##########
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.Message;
+
+import org.apache.activemq.ActiveMQSession;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class ActiveMQJMS2AckModesTest extends ActiveMQJMS2TestBase {
+
+ private final String destinationName;
+ private final String destinationType;
+ private final int ackMode;
+ private final String messagePayload;
+
+ public ActiveMQJMS2AckModesTest(String destinationType, int ackMode) {
+ this.destinationName = "AMQ.JMS2.ACKMODE." + Integer.toString(ackMode)
+ destinationType.toUpperCase();
+ this.destinationType = destinationType;
+ this.ackMode = ackMode;
+ this.messagePayload = "Test message destType: " + destinationType + "
ackMode: " + Integer.toString(ackMode);
+ }
+
+ @Parameterized.Parameters(name="destinationType={0}, ackMode={1}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"queue", ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE },
+ {"queue", ActiveMQSession.AUTO_ACKNOWLEDGE },
+ {"queue", ActiveMQSession.CLIENT_ACKNOWLEDGE },
+ {"queue", ActiveMQSession.DUPS_OK_ACKNOWLEDGE },
+ {"queue", ActiveMQSession.SESSION_TRANSACTED },
+ {"topic", ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE },
+ {"topic", ActiveMQSession.AUTO_ACKNOWLEDGE },
+ {"topic", ActiveMQSession.CLIENT_ACKNOWLEDGE },
+ {"topic", ActiveMQSession.DUPS_OK_ACKNOWLEDGE },
+ {"topic", ActiveMQSession.SESSION_TRANSACTED },
+ {"temp-queue", ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE },
+ {"temp-queue", ActiveMQSession.AUTO_ACKNOWLEDGE },
+ {"temp-queue", ActiveMQSession.CLIENT_ACKNOWLEDGE },
+ {"temp-queue", ActiveMQSession.DUPS_OK_ACKNOWLEDGE },
+ {"temp-queue", ActiveMQSession.SESSION_TRANSACTED },
+ {"temp-topic", ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE },
+ {"temp-topic", ActiveMQSession.AUTO_ACKNOWLEDGE },
+ {"temp-topic", ActiveMQSession.CLIENT_ACKNOWLEDGE },
+ {"temp-topic", ActiveMQSession.DUPS_OK_ACKNOWLEDGE },
+ {"temp-topic", ActiveMQSession.SESSION_TRANSACTED }
+ });
+ }
+
+ @Test
+ public void testAcknowledgementMode() {
+
+ try(JMSContext jmsContext =
activemqConnectionFactory.createContext("admin", "admin", ackMode)) {
+ assertNotNull(jmsContext);
+ Destination destination =
ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType,
destinationName);
+ assertNotNull(destination);
+ JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+ assertNotNull(jmsConsumer);
+ jmsContext.start();
+
+ Message message =
ActiveMQJMS2TestSupport.generateMessage(jmsContext, "text", messagePayload);
+
+ List<String> sentMessageIds = new LinkedList<>();
+ for(int deliveryMode : Arrays.asList(DeliveryMode.NON_PERSISTENT,
DeliveryMode.PERSISTENT)) {
+
sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination,
message, null, deliveryMode, null, null, null, null, null, null, null));
+
+ switch(ackMode) {
+ case ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE:
message.acknowledge(); break;
+ default: break;
+ }
+ }
+
+ // For session and client ack we ack after all messages are sent
+ switch(ackMode) {
+ case ActiveMQSession.CLIENT_ACKNOWLEDGE: message.acknowledge();
break;
+ case ActiveMQSession.SESSION_TRANSACTED: jmsContext.commit();
break;
+ default: break;
+ }
+
+ Message recvMessage = null;
+ List<Message> recvMessages = new LinkedList<>();
+ int loopCount = 0;
+ int maxLoops = 50;
+ boolean done = false;
+ do {
+ recvMessage = jmsConsumer.receive(500l);
+ if(recvMessage != null) {
+ recvMessages.add(recvMessage);
+
+ switch(ackMode) {
+ case ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE:
recvMessage.acknowledge(); break;
+ default: break;
+ }
+ }
+
+ if(recvMessages.size() == 2) {
+ done = true;
+ }
+ loopCount++;
+ } while (loopCount <= maxLoops && !done);
+
+ switch(ackMode) {
+ case ActiveMQSession.CLIENT_ACKNOWLEDGE:
recvMessage.acknowledge(); break;
+ case ActiveMQSession.SESSION_TRANSACTED: jmsContext.commit();
break;
+ default: break;
+ }
+
+ int foundCount = 0;
+ for(int validDeliveryMode :
Arrays.asList(DeliveryMode.NON_PERSISTENT, DeliveryMode.PERSISTENT)) {
+ for(javax.jms.Message tmpMessage : recvMessages) {
+ if(tmpMessage.getJMSDeliveryMode() == validDeliveryMode) {
+
ActiveMQJMS2TestSupport.validateMessageDeliveryMode(tmpMessage, "text",
messagePayload, validDeliveryMode);
+ foundCount++;
+ }
+ }
+ }
+ assertEquals(Integer.valueOf(2), Integer.valueOf(foundCount));
+ jmsConsumer.close();
+
+ // Add new consumer to ensure all messages have been consumed
+ JMSConsumer jmsConsumerEmpty =
jmsContext.createConsumer(destination);
+ assertNull(jmsConsumerEmpty.receive(500l));
Review Comment:
Using MXBean proved to be flaky on fast computers, as there is a race
condition for the ack to hit the store that produces incorrect (queueSize > 0)
values via management interface.
Issue Time Tracking
-------------------
Worklog Id: (was: 793249)
Time Spent: 16h (was: 15h 50m)
> Implement JMS 2.0 Connection createContext methods
> --------------------------------------------------
>
> Key: AMQ-8322
> URL: https://issues.apache.org/jira/browse/AMQ-8322
> Project: ActiveMQ
> Issue Type: New Feature
> Reporter: Matt Pavlovich
> Assignee: Matt Pavlovich
> Priority: Major
> Labels: #jms2
> Fix For: 5.18.0
>
> Time Spent: 16h
> Remaining Estimate: 0h
>
> Add support for JMSContext, JMSProducer and JMSConsumer for working with
> queues
--
This message was sent by Atlassian Jira
(v8.20.10#820010)