gemmellr commented on a change in pull request #3728:
URL: https://github.com/apache/activemq-artemis/pull/3728#discussion_r705423912
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -476,4 +487,48 @@ public void postAcknowledge(MessageReference ref,
AckReason reason) {
public void sendMessage(Message message, RoutingContext context,
List<MessageReference> refs) {
}
+ // I need a supress warning annotation here
Review comment:
suppress
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -380,10 +394,7 @@ private void performAck(String nodeID, long messageID,
Queue targetQueue, ACKMes
logger.warn(e.getMessage(), e);
}
} else {
- if (logger.isDebugEnabled()) {
- logger.debug("Post ack Server " + server + " could not find
messageID = " + messageID +
- " representing nodeID=" + nodeID);
- }
+ performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
Review comment:
The JIRA is light on detail, but im guessing the issue was that if a
message is mirrored but gets paged, and the ack operation message subsequently
shows up but before the message makes it onto the relevant queue, the message
may not get removed and would get put and then remain on the queue?
If so the ordering here seems like it could still present an issue. Does
anything stop the possibility that the related message wasnt on the queue
during the attempts to process the ack operation, but by the time those
attempts to remove it from the queue have completed unsuccessfully, the message
is removed from the paging store and put on the queue, before the attempt to
remove it from paging store begins or completes, thus leaving it still orphaned
on the queue?
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
##########
@@ -112,6 +112,139 @@ public AtomicInteger getScheduledCleanupCount() {
// Each CursorIterator will record their current PageReader in this map
private final ConcurrentLongHashMap<PageReader> pageReaders = new
ConcurrentLongHashMap<>();
+ private final AtomicInteger scheduledScanCount = new AtomicInteger(0);
+
+ private final LinkedList<PageScan> scanList = new LinkedList();
+
+ private static class PageScan {
+ final Comparable<PagedReference> scanFunction;
+ final Runnable found;
+ final Runnable notfound;
+
+ public Comparable<PagedReference> getScanFunction() {
+ return scanFunction;
+ }
+
+ public Runnable getFound() {
+ return found;
+ }
+
+ public Runnable getNotfound() {
+ return notfound;
+ }
+
+ PageScan(Comparable<PagedReference> scanFunction, Runnable found,
Runnable notfound) {
+ this.scanFunction = scanFunction;
+ this.found = found;
+ this.notfound = notfound;
+ }
+ }
+
+ @Override
+ public void addScanAck(Comparable<PagedReference> scanFunction, Runnable
found, Runnable notfound) {
+ PageScan scan = new PageScan(scanFunction, found, notfound);
+ synchronized (scanList) {
+ scanList.add(scan);
+ }
+ }
+
+ @Override
+ public void performScanAck() {
+ // we should only have a max of 2 scheduled tasks
+ // one that's might still be currently running, and another one lined up
+ // no need for more than that
+ if (scheduledScanCount.incrementAndGet() < 2) {
+ executor.execute(this::actualScanAck);
+ } else {
+ scheduledScanCount.decrementAndGet();
+ }
+ }
+
+ private void actualScanAck() {
+ try {
+ PageScan[] localScanList;
+ synchronized (scanList) {
+ if (scanList.size() == 0) {
+ return;
+ }
+ localScanList = scanList.stream().toArray(i -> new PageScan[i]);
+ scanList.clear();
+ }
+
+ LinkedList<Runnable> afterCommitList = new LinkedList<>();
+ TransactionImpl tx = new TransactionImpl(store);
+ tx.addOperation(new TransactionOperationAbstract() {
+ @Override
+ public void afterCommit(Transaction tx) {
+ for (Runnable r : afterCommitList) {
+ try {
+ r.run();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ }
+ });
+ PageIterator iterator = this.iterator(true);
+ try {
+ while (iterator.hasNext()) {
+ PagedReference reference = iterator.next();
+ boolean keepMoving = false;
+ for (int i = 0; i < localScanList.length; i++) {
+ PageScan scanElemen = localScanList[i];
Review comment:
scanElement
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -476,4 +487,48 @@ public void postAcknowledge(MessageReference ref,
AckReason reason) {
public void sendMessage(Message message, RoutingContext context,
List<MessageReference> refs) {
}
+ // I need a supress warning annotation here
+ // because errorProne is issuing an error her, however I really intend to
compare PageACK against PagedReference
+ @SuppressWarnings("ComparableType")
Review comment:
Just having the scanFunction be an IntFunction (rather than
not-entirely-Comparable) seems like it might be nicer, avoid potential issues
later and compiler/IDE warnings now (e.g I still get one that "ComparableType"
isnt a supported SuppressWarnings)
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/PagedMirrorTest.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <br>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <br>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PagedMirrorTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
Logger.getLogger(PagedMirrorTest.class);
+ ActiveMQServer server1;
+
+ ActiveMQServer server2;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ server1 = createServer(true, createDefaultConfig(0, true), 1024, 10 *
1024);
+ server1.getConfiguration().getAcceptorConfigurations().clear();
+ server1.getConfiguration().addAcceptorConfiguration("server",
"tcp://localhost:61616");
+ AMQPBrokerConnectConfiguration brokerConnectConfiguration = new
AMQPBrokerConnectConfiguration("other",
"tcp://localhost:61617").setReconnectAttempts(-1).setRetryInterval(1000);
+ brokerConnectConfiguration.addElement(new
AMQPMirrorBrokerConnectionElement());
+ server1.getConfiguration().addAMQPConnection(brokerConnectConfiguration);
+
+ server2 = createServer(true, createDefaultConfig(1, true), 1024, 10 *
1024);
+ server2.getConfiguration().getAcceptorConfigurations().clear();
+ server2.getConfiguration().addAcceptorConfiguration("server",
"tcp://localhost:61617");
+ brokerConnectConfiguration = new AMQPBrokerConnectConfiguration("other",
"tcp://localhost:61616").setReconnectAttempts(-1).setRetryInterval(1000);
+ brokerConnectConfiguration.addElement(new
AMQPMirrorBrokerConnectionElement());
+ server2.getConfiguration().addAMQPConnection(brokerConnectConfiguration);
+
+ server1.start();
+ server2.start();
+ }
+
+ @Test
+ public void testPaged() throws Throwable {
+ String sendURI = "tcp://localhost:61616";
+ String consumeURI = "tcp://localhost:61616";
+ String secondConsumeURI = "tcp://localhost:61617";
+
+ Wait.waitFor(() -> server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other")
!= null);
+
+ org.apache.activemq.artemis.core.server.Queue snf1 =
server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
+ Assert.assertNotNull(snf1);
+
+ org.apache.activemq.artemis.core.server.Queue snf2 =
server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
+ Assert.assertNotNull(snf2);
+
+ File countJournalLocation =
server1.getConfiguration().getJournalLocation();
+ Assert.assertTrue(countJournalLocation.exists() &&
countJournalLocation.isDirectory());
+ String protocol = "amqp";
+
+ ConnectionFactory sendCF = CFUtil.createConnectionFactory(protocol,
sendURI);
+ ConnectionFactory consumeCF = CFUtil.createConnectionFactory(protocol,
consumeURI);
+ ConnectionFactory secondConsumeCF =
CFUtil.createConnectionFactory(protocol, secondConsumeURI);
+
+ String bodyBuffer;
+ {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < 1024; i++) {
+ buffer.append("*");
+ }
+ bodyBuffer = buffer.toString();
+ }
+
+ int NUMBER_OF_MESSAGES = 200;
+ int ACK_I = 77;
+
+ try (Connection sendConnecton = sendCF.createConnection()) {
+ Session sendSession = sendConnecton.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue jmsQueue = sendSession.createQueue("someQueue");
+ MessageProducer producer = sendSession.createProducer(jmsQueue);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ TextMessage message = sendSession.createTextMessage(bodyBuffer);
+ message.setIntProperty("i", i);
+ producer.send(message);
+ }
+ sendSession.commit();
+ }
+
+ Wait.assertEquals(0, snf1::getMessageCount);
+ Wait.assertEquals(0, snf2::getMessageCount);
+
+ try (Connection consumeConnection = consumeCF.createConnection()) {
+ Session consumeSession = consumeConnection.createSession(false, 101);
// individual ack
+ Queue jmsQueue = consumeSession.createQueue("someQueue");
+ MessageConsumer consumer = consumeSession.createConsumer(jmsQueue);
+ consumeConnection.start();
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ TextMessage message = (TextMessage) consumer.receive(6000);
+ if (message.getIntProperty("i") == ACK_I) {
+ message.acknowledge();
+ }
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+ Wait.assertEquals(0, snf1::getMessageCount);
+ Wait.assertEquals(0, snf2::getMessageCount);
+ Wait.assertEquals(1, () -> acksCount(countJournalLocation), 5000, 1000);
+
+ try (Connection consumeConnection = secondConsumeCF.createConnection()) {
+ Session consumeSession = consumeConnection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue jmsQueue = consumeSession.createQueue("someQueue");
+ MessageConsumer consumer = consumeSession.createConsumer(jmsQueue);
+ consumeConnection.start();
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES - 1; i++) {
+ TextMessage message = (TextMessage) consumer.receive(6000);
+ Assert.assertNotNull(message);
+ Assert.assertNotEquals(ACK_I, message.getIntProperty("i"));
Review comment:
Would be nicer to assert the others are as expected as opposed to just
not-this-one. You could make a list and use one of the assertion/matcher libs
that would compare an expected list at the end, e.g hamcrest is already in the
build.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -476,4 +487,48 @@ public void postAcknowledge(MessageReference ref,
AckReason reason) {
public void sendMessage(Message message, RoutingContext context,
List<MessageReference> refs) {
}
+ // I need a supress warning annotation here
+ // because errorProne is issuing an error her, however I really intend to
compare PageACK against PagedReference
Review comment:
here
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
##########
@@ -640,25 +640,32 @@ private void replicaTest(boolean largeMessage,
server_2.startBrokerConnection(brokerConnectionName);
}
+ snfreplica = server_2.locateQueue(replica.getMirrorSNF());
Review comment:
Why do you need to repopulate the variable, shouldnt it still be the
same?
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
##########
@@ -112,6 +112,139 @@ public AtomicInteger getScheduledCleanupCount() {
// Each CursorIterator will record their current PageReader in this map
private final ConcurrentLongHashMap<PageReader> pageReaders = new
ConcurrentLongHashMap<>();
+ private final AtomicInteger scheduledScanCount = new AtomicInteger(0);
+
+ private final LinkedList<PageScan> scanList = new LinkedList();
+
+ private static class PageScan {
+ final Comparable<PagedReference> scanFunction;
+ final Runnable found;
+ final Runnable notfound;
+
+ public Comparable<PagedReference> getScanFunction() {
+ return scanFunction;
+ }
+
+ public Runnable getFound() {
+ return found;
+ }
+
+ public Runnable getNotfound() {
+ return notfound;
+ }
+
+ PageScan(Comparable<PagedReference> scanFunction, Runnable found,
Runnable notfound) {
+ this.scanFunction = scanFunction;
+ this.found = found;
+ this.notfound = notfound;
Review comment:
notFound (plus all the other places similarly)
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/PagedMirrorTest.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <br>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <br>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PagedMirrorTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
Logger.getLogger(PagedMirrorTest.class);
+ ActiveMQServer server1;
+
+ ActiveMQServer server2;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ server1 = createServer(true, createDefaultConfig(0, true), 1024, 10 *
1024);
+ server1.getConfiguration().getAcceptorConfigurations().clear();
+ server1.getConfiguration().addAcceptorConfiguration("server",
"tcp://localhost:61616");
+ AMQPBrokerConnectConfiguration brokerConnectConfiguration = new
AMQPBrokerConnectConfiguration("other",
"tcp://localhost:61617").setReconnectAttempts(-1).setRetryInterval(1000);
+ brokerConnectConfiguration.addElement(new
AMQPMirrorBrokerConnectionElement());
+ server1.getConfiguration().addAMQPConnection(brokerConnectConfiguration);
+
+ server2 = createServer(true, createDefaultConfig(1, true), 1024, 10 *
1024);
+ server2.getConfiguration().getAcceptorConfigurations().clear();
+ server2.getConfiguration().addAcceptorConfiguration("server",
"tcp://localhost:61617");
+ brokerConnectConfiguration = new AMQPBrokerConnectConfiguration("other",
"tcp://localhost:61616").setReconnectAttempts(-1).setRetryInterval(1000);
+ brokerConnectConfiguration.addElement(new
AMQPMirrorBrokerConnectionElement());
+ server2.getConfiguration().addAMQPConnection(brokerConnectConfiguration);
+
+ server1.start();
+ server2.start();
+ }
+
+ @Test
+ public void testPaged() throws Throwable {
+ String sendURI = "tcp://localhost:61616";
+ String consumeURI = "tcp://localhost:61616";
+ String secondConsumeURI = "tcp://localhost:61617";
+
+ Wait.waitFor(() -> server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other")
!= null);
+
+ org.apache.activemq.artemis.core.server.Queue snf1 =
server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
Review comment:
May have been clearer not to name both connections "other".
Also seems odd to wait for the server1 queue to appear and then locate the
server2 queue first. Seems like you would wait on and locate each in turn.
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
##########
@@ -85,9 +85,17 @@
// for internal (cursor) classes
void confirmPosition(Transaction tx, PagePosition position) throws
Exception;
- /**
- * @return the first page in use or MAX_LONG if none is in use
- */
+
+ // Add a scan function to be performed. It will be completed when you call
performScan
Review comment:
performScanAck
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
##########
@@ -845,23 +852,31 @@ private void consumeMessages(boolean largeMessage,
int LAST_ID,
int port,
boolean assertNull) throws JMSException {
- ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + port);
+ ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + port + "?jms.prefetchPolicy.all=0");
Review comment:
Is this needed when there is only one consumer at a time?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]