gemmellr commented on code in PR #5158:
URL: https://github.com/apache/activemq-artemis/pull/5158#discussion_r1723429249
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -201,12 +219,31 @@ public void flow() {
creditRunnable.run();
}
+
@Override
protected void actualDelivery(Message message, Delivery delivery,
DeliveryAnnotations deliveryAnnotations, Receiver receiver, Transaction tx) {
recoverContext();
+
+
+
OperationContextImpl.getContext().setSyncReplication(configuration.isMirrorReplicaSync());
Review Comment:
Does this ever need reset? I.e, could it leak to anything on the thread
later?
##########
tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirrorInfiniteRetryReplicaTest.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.brokerConnection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.io.StringWriter;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.TestParameters;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MirrorInfiniteRetryReplicaTest extends SmokeTestBase {
+
+ private static final String TEST_NAME = "LATE_RETRY_MIRROR_SOAK";
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ // Set this to true and log4j will be configured with some relevant
log.trace for the AckManager at the server's
+ private static final boolean REUSE_SERVERS =
Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "REUSE_SERVERS",
"false"));
+
+ /*
+ * Time each consumer takes to process a message received to allow some
messages accumulating.
+ * This sleep happens right before the commit.
+ */
+ private static final String QUEUE_NAME = "queueTest";
+
+ private static String body;
+
+ static {
+ StringWriter writer = new StringWriter();
+ while (writer.getBuffer().length() < 30 * 1024) {
+ writer.append("The sky is blue, ..... watch out for poop from the
birds though!...");
+ }
+ body = writer.toString();
+ }
+
+ public static final String DC1_NODE = "AckLateRetrySoakTest/DC1";
+ public static final String DC2_NODE = "AckLateRetrySoakTest/DC2";
+ public static final String DC2_REPLICA_NODE =
"AckLateRetrySoakTest/DC2_REPLICA";
+ public static final String DC1_REPLICA_NODE =
"AckLateRetrySoakTest/DC1_REPLICA";
+
+ volatile Process processDC1;
+ volatile Process processDC2;
+ volatile Process processDC1_REPLICA;
+ volatile Process processDC2_REPLICA;
+
+ @AfterEach
+ public void destroyServers() throws Exception {
+ if (processDC2_REPLICA != null) {
+ processDC2_REPLICA.destroyForcibly();
+ processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES);
+ processDC2_REPLICA = null;
+ }
+ if (processDC1_REPLICA != null) {
+ processDC1_REPLICA.destroyForcibly();
+ processDC1_REPLICA.waitFor(1, TimeUnit.MINUTES);
+ processDC1_REPLICA = null;
+ }
+ if (processDC1 != null) {
+ processDC1.destroyForcibly();
+ processDC1.waitFor(1, TimeUnit.MINUTES);
+ processDC1 = null;
+ }
+ if (processDC2 != null) {
+ processDC2.destroyForcibly();
+ processDC2.waitFor(1, TimeUnit.MINUTES);
+ processDC2 = null;
+ }
+ }
+
+ private static final String DC1_IP = "localhost:61616";
+ private static final String DC1_BACKUP_IP = "localhost:61617";
+ private static final String DC2_IP = "localhost:61618";
+ private static final String DC2_BACKUP_IP = "localhost:61619";
+
+ private static String uri(String ip) {
+ return "tcp://" + ip;
+ }
+
+ private static String uriWithAlternate(String ip, String alternate) {
+ return "tcp://" + ip + "#tcp://" + alternate;
+ }
+
+ private static void createMirroredServer(String serverName,
+ String connectionName,
+ String mirrorURI,
+ int porOffset,
Review Comment:
portOffset?
##########
artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java:
##########
@@ -582,6 +582,7 @@ public void testFileConfiguration() {
assertEquals(222, conf.getMirrorAckManagerPageAttempts());
assertEquals(333, conf.getMirrorAckManagerRetryDelay());
assertTrue(conf.isMirrorPageTransaction());
+ assertFalse(conf.isMirrorReplicaSync());
Review Comment:
Missing an opposing check of the default (e.g in
DefaultsFileConfigurationTest)
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java:
##########
@@ -3413,6 +3418,18 @@ public ConfigurationImpl
setMirrorAckManagerRetryDelay(int delay) {
return this;
}
+ @Override
+ public boolean isMirrorReplicaSync() {
+ return mirrorReplicaSync;
+ }
+
+ @Override
+ public ConfigurationImpl setMirrorReplicaSync(boolean replicaSync) {
+ logger.info("setMirrorReplicaSync {}", replicaSync);
Review Comment:
debug?
##########
artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java:
##########
@@ -292,6 +292,97 @@ public void done() {
}
}
+ @Test
+ public void testIgnoreReplication() throws Exception {
+ ExecutorService executor =
Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
Review Comment:
Could these tests work with a 'same thread executor' (e.g Runnable::run),
just to verify that the executor is not called when not expected, and then all
the work is done when expected, without any of the waits?
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -201,12 +219,31 @@ public void flow() {
creditRunnable.run();
}
+
Review Comment:
superfluous newline
##########
tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirrorInfiniteRetryReplicaTest.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.brokerConnection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.io.StringWriter;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.TestParameters;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MirrorInfiniteRetryReplicaTest extends SmokeTestBase {
+
+ private static final String TEST_NAME = "LATE_RETRY_MIRROR_SOAK";
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ // Set this to true and log4j will be configured with some relevant
log.trace for the AckManager at the server's
+ private static final boolean REUSE_SERVERS =
Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "REUSE_SERVERS",
"false"));
+
+ /*
+ * Time each consumer takes to process a message received to allow some
messages accumulating.
+ * This sleep happens right before the commit.
+ */
+ private static final String QUEUE_NAME = "queueTest";
Review Comment:
Neither of the comments seem to match their related code that well / at all.
Queue name could be better.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -95,6 +97,22 @@ public static MirrorController getControllerInUse() {
return CONTROLLER_THREAD_LOCAL.get();
}
+ /** The rate in milliseconds that we will print OperationContext debug
information on the mirror target */
+ private static final int DEBUG_CONTEXT_RATE;
+ private ScheduledFuture scheduledRateDebug = null;
+
+ static {
+ int rate;
+ try {
+ rate =
Integer.parseInt(System.getProperty(AMQPMirrorControllerTarget.class.getName()
+ ".DEBUG_CONTEXT_RATE", "5000"));
+ } catch (Throwable e) {
+ logger.debug(e.getMessage(), e);
+ rate = 0;
+ }
+
+ DEBUG_CONTEXT_RATE = rate;
Review Comment:
As this is more setting the period / interval rather than rate, perhaps
DEBUG_CONTEXT_PERIOD ?
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -201,12 +219,31 @@ public void flow() {
creditRunnable.run();
}
+
@Override
protected void actualDelivery(Message message, Delivery delivery,
DeliveryAnnotations deliveryAnnotations, Receiver receiver, Transaction tx) {
recoverContext();
+
+
+
OperationContextImpl.getContext().setSyncReplication(configuration.isMirrorReplicaSync());
+
+ if (logger.isDebugEnabled()) { // no need to schedule rate debug if no
debug allowed
+ if (DEBUG_CONTEXT_RATE > 0 && scheduledRateDebug == null) {
+ OperationContextImpl context = (OperationContextImpl)
OperationContextImpl.getContext();
+ scheduledRateDebug =
server.getScheduledPool().scheduleAtFixedRate(() -> {
+ logger.debug(">>> OperationContext rate information:
synReplica={}, replicationLineup = {}. replicationDone = {}, pending (back
pressure) = {}", configuration.isMirrorReplicaSync(),
context.getReplicationLineUpField(), context.getReplicated(),
(context.getReplicationLineUpField() - context.getReplicated()));
+ }, DEBUG_CONTEXT_RATE, DEBUG_CONTEXT_RATE, TimeUnit.MILLISECONDS);
+ }
+ } else {
+ if (scheduledRateDebug != null) {
+ scheduledRateDebug.cancel(true);
+ scheduledRateDebug = null;
+ }
+ }
Review Comment:
Can this go in its own method to separate it out for readability (of what
remains), rather than making this already-large method even more so?
##########
tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirrorInfiniteRetryReplicaTest.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.brokerConnection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.io.StringWriter;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.TestParameters;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MirrorInfiniteRetryReplicaTest extends SmokeTestBase {
+
+ private static final String TEST_NAME = "LATE_RETRY_MIRROR_SOAK";
Review Comment:
Should the constant be more related to the class name / vice versa?
##########
tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirrorInfiniteRetryReplicaTest.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.brokerConnection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.io.StringWriter;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.TestParameters;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MirrorInfiniteRetryReplicaTest extends SmokeTestBase {
+
+ private static final String TEST_NAME = "LATE_RETRY_MIRROR_SOAK";
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ // Set this to true and log4j will be configured with some relevant
log.trace for the AckManager at the server's
+ private static final boolean REUSE_SERVERS =
Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "REUSE_SERVERS",
"false"));
+
+ /*
+ * Time each consumer takes to process a message received to allow some
messages accumulating.
+ * This sleep happens right before the commit.
+ */
+ private static final String QUEUE_NAME = "queueTest";
+
+ private static String body;
+
+ static {
+ StringWriter writer = new StringWriter();
+ while (writer.getBuffer().length() < 30 * 1024) {
+ writer.append("The sky is blue, ..... watch out for poop from the
birds though!...");
+ }
+ body = writer.toString();
+ }
+
+ public static final String DC1_NODE = "AckLateRetrySoakTest/DC1";
+ public static final String DC2_NODE = "AckLateRetrySoakTest/DC2";
+ public static final String DC2_REPLICA_NODE =
"AckLateRetrySoakTest/DC2_REPLICA";
+ public static final String DC1_REPLICA_NODE =
"AckLateRetrySoakTest/DC1_REPLICA";
+
+ volatile Process processDC1;
+ volatile Process processDC2;
+ volatile Process processDC1_REPLICA;
+ volatile Process processDC2_REPLICA;
+
+ @AfterEach
+ public void destroyServers() throws Exception {
+ if (processDC2_REPLICA != null) {
+ processDC2_REPLICA.destroyForcibly();
+ processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES);
+ processDC2_REPLICA = null;
+ }
+ if (processDC1_REPLICA != null) {
+ processDC1_REPLICA.destroyForcibly();
+ processDC1_REPLICA.waitFor(1, TimeUnit.MINUTES);
+ processDC1_REPLICA = null;
+ }
+ if (processDC1 != null) {
+ processDC1.destroyForcibly();
+ processDC1.waitFor(1, TimeUnit.MINUTES);
+ processDC1 = null;
+ }
+ if (processDC2 != null) {
+ processDC2.destroyForcibly();
+ processDC2.waitFor(1, TimeUnit.MINUTES);
+ processDC2 = null;
+ }
+ }
+
+ private static final String DC1_IP = "localhost:61616";
+ private static final String DC1_BACKUP_IP = "localhost:61617";
+ private static final String DC2_IP = "localhost:61618";
+ private static final String DC2_BACKUP_IP = "localhost:61619";
+
+ private static String uri(String ip) {
+ return "tcp://" + ip;
+ }
+
+ private static String uriWithAlternate(String ip, String alternate) {
+ return "tcp://" + ip + "#tcp://" + alternate;
+ }
+
+ private static void createMirroredServer(String serverName,
+ String connectionName,
+ String mirrorURI,
+ int porOffset,
+ boolean replicated,
+ String clusterStatic) throws
Exception {
+ File serverLocation = getFileServerLocation(serverName);
+ if (REUSE_SERVERS && serverLocation.exists()) {
+ deleteDirectory(new File(serverLocation, "data"));
+ return;
+ }
+ deleteDirectory(serverLocation);
+
+ HelperCreate cliCreateServer = new HelperCreate();
+
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
+ cliCreateServer.setNoWeb(true);
+ cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor",
"--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name",
DC1_NODE);
+ cliCreateServer.addArgs("--queues", QUEUE_NAME);
+ cliCreateServer.setPortOffset(porOffset);
+ if (replicated) {
+ cliCreateServer.setReplicated(true);
+ cliCreateServer.setStaticCluster(clusterStatic);
+ cliCreateServer.setClustered(true);
+ } else {
+ cliCreateServer.setClustered(false);
+ }
+
+ cliCreateServer.createServer();
+
+ Properties brokerProperties = new Properties();
+ brokerProperties.put("messageExpiryScanPeriod", "1000");
+ brokerProperties.put("AMQPConnections." + connectionName + ".uri",
mirrorURI);
+ brokerProperties.put("AMQPConnections." + connectionName +
".retryInterval", "1000");
+ brokerProperties.put("AMQPConnections." + connectionName + ".type",
AMQPBrokerConnectionAddressType.MIRROR.toString());
+ brokerProperties.put("AMQPConnections." + connectionName +
".connectionElements.mirror.sync", "false");
+ brokerProperties.put("largeMessageSync", "false");
+
+ brokerProperties.put("addressSettings.#.maxSizeMessages", "50000");
+ brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
+ brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
+ brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
+ // if we don't use pageTransactions we may eventually get a few
duplicates
+ brokerProperties.put("mirrorPageTransaction", "true");
+
+ brokerProperties.put("mirrorAckManagerQueueAttempts", "2");
+ brokerProperties.put("mirrorAckManagerPageAttempts", "500000");
+ brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
+
+ File brokerPropertiesFile = new File(serverLocation,
"broker.properties");
+ saveProperties(brokerProperties, brokerPropertiesFile);
+
+ File brokerXml = new File(serverLocation, "/etc/broker.xml");
+ assertTrue(brokerXml.exists());
+ // Adding redistribution delay to broker configuration
+ assertTrue(FileUtil.findReplace(brokerXml, "<address-setting
match=\"#\">", "<address-setting match=\"#\">\n\n" + "
<redistribution-delay>0</redistribution-delay>\n"));
+
+ replaceLogs(serverLocation);
+
+ }
+
+ private static void replaceLogs(File serverLocation) throws Exception {
+ File log4j = new File(serverLocation, "/etc/log4j2.properties");
+ assertTrue(FileUtil.findReplace(log4j,
"logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" + "\n" +
"logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n"
+ "logger.endpoint.level=DEBUG\n" +
"logger.ackmanager.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager\n"
+ "logger.ackmanager.level=TRACE\n" + "appender.console.filter.threshold.type
= ThresholdFilter\n" + "appender.console.filter.threshold.level = trace"));
+ }
+
+ private static void createMirroredBackupServer(String serverName,
+ int porOffset,
Review Comment:
portOffset?
##########
tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirrorInfiniteRetryReplicaTest.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.brokerConnection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.io.StringWriter;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.TestParameters;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MirrorInfiniteRetryReplicaTest extends SmokeTestBase {
+
+ private static final String TEST_NAME = "LATE_RETRY_MIRROR_SOAK";
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ // Set this to true and log4j will be configured with some relevant
log.trace for the AckManager at the server's
+ private static final boolean REUSE_SERVERS =
Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "REUSE_SERVERS",
"false"));
+
+ /*
+ * Time each consumer takes to process a message received to allow some
messages accumulating.
+ * This sleep happens right before the commit.
+ */
+ private static final String QUEUE_NAME = "queueTest";
+
+ private static String body;
+
+ static {
+ StringWriter writer = new StringWriter();
+ while (writer.getBuffer().length() < 30 * 1024) {
+ writer.append("The sky is blue, ..... watch out for poop from the
birds though!...");
+ }
+ body = writer.toString();
+ }
+
+ public static final String DC1_NODE = "AckLateRetrySoakTest/DC1";
+ public static final String DC2_NODE = "AckLateRetrySoakTest/DC2";
+ public static final String DC2_REPLICA_NODE =
"AckLateRetrySoakTest/DC2_REPLICA";
+ public static final String DC1_REPLICA_NODE =
"AckLateRetrySoakTest/DC1_REPLICA";
+
+ volatile Process processDC1;
+ volatile Process processDC2;
+ volatile Process processDC1_REPLICA;
+ volatile Process processDC2_REPLICA;
+
+ @AfterEach
+ public void destroyServers() throws Exception {
+ if (processDC2_REPLICA != null) {
+ processDC2_REPLICA.destroyForcibly();
+ processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES);
+ processDC2_REPLICA = null;
+ }
+ if (processDC1_REPLICA != null) {
+ processDC1_REPLICA.destroyForcibly();
+ processDC1_REPLICA.waitFor(1, TimeUnit.MINUTES);
+ processDC1_REPLICA = null;
+ }
+ if (processDC1 != null) {
+ processDC1.destroyForcibly();
+ processDC1.waitFor(1, TimeUnit.MINUTES);
+ processDC1 = null;
+ }
+ if (processDC2 != null) {
+ processDC2.destroyForcibly();
+ processDC2.waitFor(1, TimeUnit.MINUTES);
+ processDC2 = null;
+ }
+ }
+
+ private static final String DC1_IP = "localhost:61616";
+ private static final String DC1_BACKUP_IP = "localhost:61617";
+ private static final String DC2_IP = "localhost:61618";
+ private static final String DC2_BACKUP_IP = "localhost:61619";
+
+ private static String uri(String ip) {
+ return "tcp://" + ip;
+ }
+
+ private static String uriWithAlternate(String ip, String alternate) {
+ return "tcp://" + ip + "#tcp://" + alternate;
+ }
+
+ private static void createMirroredServer(String serverName,
+ String connectionName,
+ String mirrorURI,
+ int porOffset,
+ boolean replicated,
+ String clusterStatic) throws
Exception {
+ File serverLocation = getFileServerLocation(serverName);
+ if (REUSE_SERVERS && serverLocation.exists()) {
+ deleteDirectory(new File(serverLocation, "data"));
+ return;
+ }
+ deleteDirectory(serverLocation);
+
+ HelperCreate cliCreateServer = new HelperCreate();
+
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
+ cliCreateServer.setNoWeb(true);
+ cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor",
"--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name",
DC1_NODE);
+ cliCreateServer.addArgs("--queues", QUEUE_NAME);
+ cliCreateServer.setPortOffset(porOffset);
+ if (replicated) {
+ cliCreateServer.setReplicated(true);
+ cliCreateServer.setStaticCluster(clusterStatic);
+ cliCreateServer.setClustered(true);
+ } else {
+ cliCreateServer.setClustered(false);
+ }
+
+ cliCreateServer.createServer();
+
+ Properties brokerProperties = new Properties();
+ brokerProperties.put("messageExpiryScanPeriod", "1000");
+ brokerProperties.put("AMQPConnections." + connectionName + ".uri",
mirrorURI);
+ brokerProperties.put("AMQPConnections." + connectionName +
".retryInterval", "1000");
+ brokerProperties.put("AMQPConnections." + connectionName + ".type",
AMQPBrokerConnectionAddressType.MIRROR.toString());
+ brokerProperties.put("AMQPConnections." + connectionName +
".connectionElements.mirror.sync", "false");
+ brokerProperties.put("largeMessageSync", "false");
+
+ brokerProperties.put("addressSettings.#.maxSizeMessages", "50000");
+ brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
+ brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
+ brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
+ // if we don't use pageTransactions we may eventually get a few
duplicates
+ brokerProperties.put("mirrorPageTransaction", "true");
+
+ brokerProperties.put("mirrorAckManagerQueueAttempts", "2");
+ brokerProperties.put("mirrorAckManagerPageAttempts", "500000");
+ brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
+
+ File brokerPropertiesFile = new File(serverLocation,
"broker.properties");
+ saveProperties(brokerProperties, brokerPropertiesFile);
+
+ File brokerXml = new File(serverLocation, "/etc/broker.xml");
+ assertTrue(brokerXml.exists());
+ // Adding redistribution delay to broker configuration
+ assertTrue(FileUtil.findReplace(brokerXml, "<address-setting
match=\"#\">", "<address-setting match=\"#\">\n\n" + "
<redistribution-delay>0</redistribution-delay>\n"));
+
+ replaceLogs(serverLocation);
+
+ }
+
+ private static void replaceLogs(File serverLocation) throws Exception {
+ File log4j = new File(serverLocation, "/etc/log4j2.properties");
+ assertTrue(FileUtil.findReplace(log4j,
"logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" + "\n" +
"logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n"
+ "logger.endpoint.level=DEBUG\n" +
"logger.ackmanager.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager\n"
+ "logger.ackmanager.level=TRACE\n" + "appender.console.filter.threshold.type
= ThresholdFilter\n" + "appender.console.filter.threshold.level = trace"));
+ }
+
+ private static void createMirroredBackupServer(String serverName,
+ int porOffset,
+ String clusterStatic,
+ String mirrorURI) throws
Exception {
+ File serverLocation = getFileServerLocation(serverName);
+ if (REUSE_SERVERS && serverLocation.exists()) {
+ deleteDirectory(new File(serverLocation, "data"));
+ return;
+ }
+ deleteDirectory(serverLocation);
+
+ HelperCreate cliCreateServer = new HelperCreate();
+
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
+ cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
+ cliCreateServer.setNoWeb(true);
+ cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor",
"--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name",
DC1_NODE);
+ cliCreateServer.setPortOffset(porOffset);
+ cliCreateServer.setClustered(true);
+ cliCreateServer.setReplicated(true);
+ cliCreateServer.setBackup(true);
+ cliCreateServer.setStaticCluster(clusterStatic);
+ cliCreateServer.createServer();
+
+ Properties brokerProperties = new Properties();
+ brokerProperties.put("messageExpiryScanPeriod", "1000");
+ brokerProperties.put("AMQPConnections.mirror.uri", mirrorURI);
+ brokerProperties.put("AMQPConnections.mirror.retryInterval", "1000");
+ brokerProperties.put("AMQPConnections.mirror.type",
AMQPBrokerConnectionAddressType.MIRROR.toString());
+
brokerProperties.put("AMQPConnections.mirror.connectionElements.mirror.sync",
"false");
+ brokerProperties.put("largeMessageSync", "false");
+
+ brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
+ brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
+ brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
+ brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
+
+ brokerProperties.put("mirrorAckManagerQueueAttempts", "200");
+ brokerProperties.put("mirrorAckManagerPageAttempts", "200000");
+ brokerProperties.put("mirrorAckManagerRetryDelay", "10");
+
+ // if we don't use pageTransactions we may eventually get a few
duplicates
+ brokerProperties.put("mirrorPageTransaction", "true");
+ File brokerPropertiesFile = new File(serverLocation,
"broker.properties");
+ saveProperties(brokerProperties, brokerPropertiesFile);
+
+ File brokerXml = new File(serverLocation, "/etc/broker.xml");
+ assertTrue(brokerXml.exists());
+ // Adding redistribution delay to broker configuration
+ assertTrue(FileUtil.findReplace(brokerXml, "<address-setting
match=\"#\">", "<address-setting match=\"#\">\n\n" + "
<redistribution-delay>0</redistribution-delay> <!-- added by
SimpleMirrorSoakTest.java --> \n"));
+ assertTrue(FileUtil.findReplace(brokerXml,
"<page-size-bytes>10M</page-size-bytes>",
"<page-size-bytes>100K</page-size-bytes>"));
+
+ replaceLogs(serverLocation);
+ }
+
+ public static void createRealServers() throws Exception {
+ createMirroredServer(DC1_NODE, "mirror", uriWithAlternate(DC2_IP,
DC2_BACKUP_IP), 0, true, uri(DC1_BACKUP_IP));
+ createMirroredBackupServer(DC1_REPLICA_NODE, 1, uri(DC1_IP),
uriWithAlternate(DC2_IP, DC2_BACKUP_IP));
+ createMirroredServer(DC2_NODE, "mirror", uriWithAlternate(DC1_IP,
DC1_BACKUP_IP), 2, true, uri(DC2_BACKUP_IP));
+ createMirroredBackupServer(DC2_REPLICA_NODE, 3, uri(DC2_IP),
uriWithAlternate(DC1_IP, DC1_BACKUP_IP));
+ }
+
+ @Test
+ public void testConsumersAttached() throws Exception {
+ createRealServers();
+
+ SimpleManagement managementDC1 = new SimpleManagement(uri(DC1_IP), null,
null);
+ SimpleManagement managementDC2 = new SimpleManagement(uri(DC2_IP), null,
null);
+
+ processDC2 = startServer(DC2_NODE, -1, -1, new
File(getServerLocation(DC2_NODE), "broker.properties"));
+ processDC2_REPLICA = startServer(DC2_REPLICA_NODE, -1, -1, new
File(getServerLocation(DC2_REPLICA_NODE), "broker.properties"));
+
+ processDC1 = startServer(DC1_NODE, -1, -1, new
File(getServerLocation(DC1_NODE), "broker.properties"));
+ processDC1_REPLICA = startServer(DC1_REPLICA_NODE, -1, -1, new
File(getServerLocation(DC1_REPLICA_NODE), "broker.properties"));
+
+ ServerUtil.waitForServerToStart(2, 10_000);
+ Wait.assertTrue(managementDC2::isReplicaSync);
+
+ ServerUtil.waitForServerToStart(0, 10_000);
+ Wait.assertTrue(managementDC1::isReplicaSync);
+
+ runAfter(() -> managementDC1.close());
+ runAfter(() -> managementDC2.close());
+
+ ConnectionFactory connectionFactoryDC1A =
CFUtil.createConnectionFactory("amqp", uri(DC1_IP));
+ try (Connection connection = connectionFactoryDC1A.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session.createProducer(session.createQueue(QUEUE_NAME));
+ TextMessage message = session.createTextMessage(body);
+ message.setIntProperty("i", 1);
+ message.setBooleanProperty("large", false);
+ producer.send(message);
+ session.commit();
+
+ session.commit();
Review Comment:
Would once be enough?
##########
artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java:
##########
@@ -292,6 +292,97 @@ public void done() {
}
}
+ @Test
+ public void testIgnoreReplication() throws Exception {
+ ExecutorService executor =
Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
+ ConcurrentLinkedQueue<Long> completions = new ConcurrentLinkedQueue();
+ final int N = 500;
+ try {
+ final OperationContextImpl impl = new OperationContextImpl(new
OrderedExecutor(executor));
+
+ // pending work to queue completions till done
+ impl.storeLineUp();
+ impl.setSyncReplication(false);
+ impl.replicationLineUp();
+
+ for (long l = 0; l < N; l++) {
+ long finalL = l;
+ impl.executeOnCompletion(new IOCallback() {
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+
+ @Override
+ public void done() {
+ completions.add(finalL);
+ }
+ });
+ }
+
+ Thread.sleep(100);
+ assertEquals(0, completions.size());
+
+ impl.done();
+ Wait.assertEquals(N, ()-> completions.size(), 5000, 100);
+
+ impl.replicationDone();
+
+
+ for (long i = 0; i < N; i++) {
+ assertEquals(i, (long) completions.poll(), "ordered");
+ }
+
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+
+ @Test
+ public void testWaitOnReplication() throws Exception {
+ ExecutorService executor =
Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
+ ConcurrentLinkedQueue<Long> completions = new ConcurrentLinkedQueue();
+ final int N = 500;
+ try {
+ final OperationContextImpl impl = new OperationContextImpl(new
OrderedExecutor(executor));
+
+ // pending work to queue completions till done
+ impl.storeLineUp();
+ impl.replicationLineUp();
+
+ for (long l = 0; l < N; l++) {
+ long finalL = l;
+ impl.executeOnCompletion(new IOCallback() {
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+
+ @Override
+ public void done() {
+ completions.add(finalL);
+ }
+ });
+ }
+
+ impl.done();
+
+ Thread.sleep(100);
+ assertEquals(0, completions.size());
+
+ impl.replicationDone();
+ Wait.assertEquals(N, ()-> completions.size(), 5000, 100);
+
+
+
+ for (long i = 0; i < N; i++) {
+ assertEquals(i, (long) completions.poll(), "ordered");
+ }
+
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
Review Comment:
Various superfluous newlines. Same in earlier test.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -201,12 +219,31 @@ public void flow() {
creditRunnable.run();
}
+
@Override
protected void actualDelivery(Message message, Delivery delivery,
DeliveryAnnotations deliveryAnnotations, Receiver receiver, Transaction tx) {
recoverContext();
+
+
Review Comment:
superfluous newline
##########
tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirrorInfiniteRetryReplicaTest.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.brokerConnection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.io.StringWriter;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.TestParameters;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MirrorInfiniteRetryReplicaTest extends SmokeTestBase {
+
+ private static final String TEST_NAME = "LATE_RETRY_MIRROR_SOAK";
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ // Set this to true and log4j will be configured with some relevant
log.trace for the AckManager at the server's
+ private static final boolean REUSE_SERVERS =
Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "REUSE_SERVERS",
"false"));
+
+ /*
+ * Time each consumer takes to process a message received to allow some
messages accumulating.
+ * This sleep happens right before the commit.
+ */
+ private static final String QUEUE_NAME = "queueTest";
+
+ private static String body;
+
+ static {
+ StringWriter writer = new StringWriter();
+ while (writer.getBuffer().length() < 30 * 1024) {
+ writer.append("The sky is blue, ..... watch out for poop from the
birds though!...");
+ }
+ body = writer.toString();
+ }
+
+ public static final String DC1_NODE = "AckLateRetrySoakTest/DC1";
+ public static final String DC2_NODE = "AckLateRetrySoakTest/DC2";
+ public static final String DC2_REPLICA_NODE =
"AckLateRetrySoakTest/DC2_REPLICA";
+ public static final String DC1_REPLICA_NODE =
"AckLateRetrySoakTest/DC1_REPLICA";
+
+ volatile Process processDC1;
+ volatile Process processDC2;
+ volatile Process processDC1_REPLICA;
+ volatile Process processDC2_REPLICA;
+
+ @AfterEach
+ public void destroyServers() throws Exception {
+ if (processDC2_REPLICA != null) {
+ processDC2_REPLICA.destroyForcibly();
+ processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES);
+ processDC2_REPLICA = null;
+ }
+ if (processDC1_REPLICA != null) {
+ processDC1_REPLICA.destroyForcibly();
+ processDC1_REPLICA.waitFor(1, TimeUnit.MINUTES);
+ processDC1_REPLICA = null;
+ }
+ if (processDC1 != null) {
+ processDC1.destroyForcibly();
+ processDC1.waitFor(1, TimeUnit.MINUTES);
+ processDC1 = null;
+ }
+ if (processDC2 != null) {
+ processDC2.destroyForcibly();
+ processDC2.waitFor(1, TimeUnit.MINUTES);
+ processDC2 = null;
+ }
+ }
+
+ private static final String DC1_IP = "localhost:61616";
+ private static final String DC1_BACKUP_IP = "localhost:61617";
+ private static final String DC2_IP = "localhost:61618";
+ private static final String DC2_BACKUP_IP = "localhost:61619";
+
+ private static String uri(String ip) {
+ return "tcp://" + ip;
+ }
+
+ private static String uriWithAlternate(String ip, String alternate) {
+ return "tcp://" + ip + "#tcp://" + alternate;
+ }
+
+ private static void createMirroredServer(String serverName,
+ String connectionName,
+ String mirrorURI,
+ int porOffset,
+ boolean replicated,
+ String clusterStatic) throws
Exception {
+ File serverLocation = getFileServerLocation(serverName);
+ if (REUSE_SERVERS && serverLocation.exists()) {
+ deleteDirectory(new File(serverLocation, "data"));
+ return;
+ }
+ deleteDirectory(serverLocation);
+
+ HelperCreate cliCreateServer = new HelperCreate();
+
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
+ cliCreateServer.setNoWeb(true);
+ cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor",
"--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name",
DC1_NODE);
+ cliCreateServer.addArgs("--queues", QUEUE_NAME);
+ cliCreateServer.setPortOffset(porOffset);
+ if (replicated) {
+ cliCreateServer.setReplicated(true);
+ cliCreateServer.setStaticCluster(clusterStatic);
+ cliCreateServer.setClustered(true);
+ } else {
+ cliCreateServer.setClustered(false);
+ }
+
+ cliCreateServer.createServer();
+
+ Properties brokerProperties = new Properties();
+ brokerProperties.put("messageExpiryScanPeriod", "1000");
+ brokerProperties.put("AMQPConnections." + connectionName + ".uri",
mirrorURI);
+ brokerProperties.put("AMQPConnections." + connectionName +
".retryInterval", "1000");
+ brokerProperties.put("AMQPConnections." + connectionName + ".type",
AMQPBrokerConnectionAddressType.MIRROR.toString());
+ brokerProperties.put("AMQPConnections." + connectionName +
".connectionElements.mirror.sync", "false");
+ brokerProperties.put("largeMessageSync", "false");
+
+ brokerProperties.put("addressSettings.#.maxSizeMessages", "50000");
+ brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
+ brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
+ brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
+ // if we don't use pageTransactions we may eventually get a few
duplicates
+ brokerProperties.put("mirrorPageTransaction", "true");
+
+ brokerProperties.put("mirrorAckManagerQueueAttempts", "2");
+ brokerProperties.put("mirrorAckManagerPageAttempts", "500000");
+ brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
+
+ File brokerPropertiesFile = new File(serverLocation,
"broker.properties");
+ saveProperties(brokerProperties, brokerPropertiesFile);
+
+ File brokerXml = new File(serverLocation, "/etc/broker.xml");
+ assertTrue(brokerXml.exists());
+ // Adding redistribution delay to broker configuration
+ assertTrue(FileUtil.findReplace(brokerXml, "<address-setting
match=\"#\">", "<address-setting match=\"#\">\n\n" + "
<redistribution-delay>0</redistribution-delay>\n"));
+
+ replaceLogs(serverLocation);
+
+ }
+
+ private static void replaceLogs(File serverLocation) throws Exception {
+ File log4j = new File(serverLocation, "/etc/log4j2.properties");
+ assertTrue(FileUtil.findReplace(log4j,
"logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" + "\n" +
"logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n"
+ "logger.endpoint.level=DEBUG\n" +
"logger.ackmanager.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager\n"
+ "logger.ackmanager.level=TRACE\n" + "appender.console.filter.threshold.type
= ThresholdFilter\n" + "appender.console.filter.threshold.level = trace"));
+ }
+
+ private static void createMirroredBackupServer(String serverName,
+ int porOffset,
+ String clusterStatic,
+ String mirrorURI) throws
Exception {
+ File serverLocation = getFileServerLocation(serverName);
+ if (REUSE_SERVERS && serverLocation.exists()) {
+ deleteDirectory(new File(serverLocation, "data"));
+ return;
+ }
+ deleteDirectory(serverLocation);
+
+ HelperCreate cliCreateServer = new HelperCreate();
+
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
+ cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
+ cliCreateServer.setNoWeb(true);
+ cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor",
"--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name",
DC1_NODE);
+ cliCreateServer.setPortOffset(porOffset);
+ cliCreateServer.setClustered(true);
+ cliCreateServer.setReplicated(true);
+ cliCreateServer.setBackup(true);
+ cliCreateServer.setStaticCluster(clusterStatic);
+ cliCreateServer.createServer();
+
+ Properties brokerProperties = new Properties();
+ brokerProperties.put("messageExpiryScanPeriod", "1000");
+ brokerProperties.put("AMQPConnections.mirror.uri", mirrorURI);
+ brokerProperties.put("AMQPConnections.mirror.retryInterval", "1000");
+ brokerProperties.put("AMQPConnections.mirror.type",
AMQPBrokerConnectionAddressType.MIRROR.toString());
+
brokerProperties.put("AMQPConnections.mirror.connectionElements.mirror.sync",
"false");
+ brokerProperties.put("largeMessageSync", "false");
+
+ brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
+ brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
+ brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
+ brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
+
+ brokerProperties.put("mirrorAckManagerQueueAttempts", "200");
+ brokerProperties.put("mirrorAckManagerPageAttempts", "200000");
+ brokerProperties.put("mirrorAckManagerRetryDelay", "10");
+
+ // if we don't use pageTransactions we may eventually get a few
duplicates
+ brokerProperties.put("mirrorPageTransaction", "true");
+ File brokerPropertiesFile = new File(serverLocation,
"broker.properties");
+ saveProperties(brokerProperties, brokerPropertiesFile);
+
+ File brokerXml = new File(serverLocation, "/etc/broker.xml");
+ assertTrue(brokerXml.exists());
+ // Adding redistribution delay to broker configuration
+ assertTrue(FileUtil.findReplace(brokerXml, "<address-setting
match=\"#\">", "<address-setting match=\"#\">\n\n" + "
<redistribution-delay>0</redistribution-delay> <!-- added by
SimpleMirrorSoakTest.java --> \n"));
Review Comment:
Is the comment meant to reference this test class rather than another one?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact