[
https://issues.apache.org/jira/browse/ARTEMIS-5001?focusedWorklogId=932375&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-932375
]
ASF GitHub Bot logged work on ARTEMIS-5001:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 29/Aug/24 11:20
Start Date: 29/Aug/24 11:20
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #5172:
URL: https://github.com/apache/activemq-artemis/pull/5172#discussion_r1736009266
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java:
##########
@@ -91,8 +92,8 @@ public AMQPSessionContext getSessionContext() {
return protonSession;
}
- protected void recoverContext() {
- sessionSPI.recoverContext();
+ protected OperationContext recoverContext() {
Review Comment:
Think this could use the matching javadoc to describe that it is really the
_current/old_ context being returned, not the 'recovered' one that will be used
afterwards.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -377,6 +428,30 @@ public String toString() {
}
}
+
+ static final class IgnoreReplicationTaskHolder {
+ @Override
+ public String toString() {
+ return "TaskHolder [storeLined=" + storeLined +
Review Comment:
c&p mismatch of class name
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -419,21 +494,17 @@ public boolean waitCompletion(final long timeout) throws
InterruptedException, A
@Override
public String toString() {
- return "OperationContextImpl [" + hashCode() + "] [minimalStore=" +
minimalStore +
+ return "OperationContextImpl [" + hashCode() +
Review Comment:
The hashcode used to be separated on its own, "[hashcode][others=foo]", did
you mean to put them all in the same block?
Also, would it be worth switching to System.identityHashCode(this) to make
clearer thats what value is desired, given this doenst even implement
hashCode().
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java:
##########
@@ -139,7 +140,14 @@ public void endRetry() {
// schedule a retry
if (!sortRetries().isEmpty()) {
- scheduledComponent.delay();
+ ActiveMQScheduledComponent scheduleComponentReference =
scheduledComponent;
+ if (scheduleComponentReference != null) {
+ try {
+ scheduleComponentReference.delay();
+ } catch (RejectedExecutionException thatsOK) {
+ logger.debug(thatsOK.getMessage(), thatsOK);
Review Comment:
Logging _thatsOk_ already prints its message, so rather than simply
duplicating it, adding a useful description message would be helpful to anyone
that later encounters this logging and stacktrace. E.g to understand _what_ was
rejected without having to go look up the code.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -165,56 +185,74 @@ public synchronized void replicationDone() {
@Override
public void executeOnCompletion(IOCallback runnable) {
- executeOnCompletion(runnable, false);
+ executeOnCompletion(runnable, OperationConsistencyLevel.FULL);
}
@Override
- public void executeOnCompletion(final IOCallback completion, final boolean
storeOnly) {
+ public void executeOnCompletion(final IOCallback completion, final
OperationConsistencyLevel consistencyLevel) {
boolean executeNow = false;
synchronized (this) {
if (errorCode == -1) {
final long storeLined = STORE_LINEUP_UPDATER.get(this);
final long pageLined = PAGE_LINEUP_UPDATER.get(this);
final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this);
- if (storeOnly) {
- if (storeOnlyTasks == null) {
- storeOnlyTasks = new LinkedList<>();
- }
- } else {
- if (tasks == null) {
- tasks = new LinkedList<>();
- minimalReplicated = replicationLined;
- minimalStore = storeLined;
- minimalPage = pageLined;
- }
- }
- // On this case, we can just execute the context directly
-
- if (replicationLined == replicated && storeLined == stored &&
pageLined == paged) {
- // We want to avoid the executor if everything is complete...
- // However, we can't execute the context if there are
executions pending
- // We need to use the executor on this case
- if (EXECUTORS_PENDING_UPDATER.get(this) == 0) {
- // No need to use an executor here or a context switch
- // there are no actions pending.. hence we can just execute
the task directly on the same thread
- executeNow = true;
- } else {
- execute(completion);
- }
- } else {
- if (storeOnly) {
- if (storeLined == stored &&
EXECUTORS_PENDING_UPDATER.get(this) == 0) {
- executeNow = true;
+ switch (consistencyLevel) {
+ case STORAGE:
+ if (storeOnlyTasks == null) {
+ storeOnlyTasks = new LinkedList<>();
+ }
+ if (storeLined == stored) {
+ if (hasNoPendingExecution()) {
+ executeNow = true;
+ } else {
+ execute(completion);
+ }
} else {
- assert !storeOnlyTasks.isEmpty() ?
storeOnlyTasks.peekLast().storeLined <= storeLined : true;
storeOnlyTasks.add(new StoreOnlyTaskHolder(completion,
storeLined));
}
- } else {
- // ensure total ordering
- assert validateTasksAdd(storeLined, replicationLined,
pageLined);
- tasks.add(new TaskHolder(completion, storeLined,
replicationLined, pageLined));
- }
+ break;
+
+ case IGNORE_REPLICATION:
+ if (ignoreReplicationTasks == null) {
+ ignoreReplicationTasks = new LinkedList<>();
+ }
+
+ if (storeLined == stored && pageLined == paged) {
+ if (hasNoPendingExecution()) {
+ // No need to use an executor here or a context switch
+ // there are no actions pending.. hence we can just
execute the task directly on the same thread
Review Comment:
Bumping as the UI is hiding the thread. As before, either this comment isnt
important enough to be needed at all, or it should really be on the _first_
instance this occurs in the method as well / instead (16 lines earlier).
##########
tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirrorInfiniteRetryReplicaTest.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.brokerConnection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MirrorInfiniteRetryReplicaTest extends SmokeTestBase {
+
+ private static final String QUEUE_NAME =
"MirrorInfiniteRetryReplicaTestQueue";
+
+ public static final String DC1_NODE = "MirrorInfiniteRetryReplicaTest/DC1";
+ public static final String DC2_NODE = "MirrorInfiniteRetryReplicaTest/DC2";
+ public static final String DC2_REPLICA_NODE =
"MirrorInfiniteRetryReplicaTest/DC2_REPLICA";
+ public static final String DC1_REPLICA_NODE =
"MirrorInfiniteRetryReplicaTest/DC1_REPLICA";
+
+ volatile Process processDC1;
+ volatile Process processDC2;
+ volatile Process processDC1_REPLICA;
+ volatile Process processDC2_REPLICA;
+
+
+ // change this to true to have the server producing more detailed logs
+ private static final boolean TRACE_LOGS = false;
+
+ @AfterEach
+ public void destroyServers() throws Exception {
+ if (processDC2_REPLICA != null) {
+ processDC2_REPLICA.destroyForcibly();
+ processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES);
+ processDC2_REPLICA = null;
+ }
+ if (processDC1_REPLICA != null) {
+ processDC1_REPLICA.destroyForcibly();
+ processDC1_REPLICA.waitFor(1, TimeUnit.MINUTES);
+ processDC1_REPLICA = null;
+ }
+ if (processDC1 != null) {
+ processDC1.destroyForcibly();
+ processDC1.waitFor(1, TimeUnit.MINUTES);
+ processDC1 = null;
+ }
+ if (processDC2 != null) {
+ processDC2.destroyForcibly();
+ processDC2.waitFor(1, TimeUnit.MINUTES);
+ processDC2 = null;
+ }
+ }
+
+ private static final String DC1_IP = "localhost:61616";
+ private static final String DC1_BACKUP_IP = "localhost:61617";
+ private static final String DC2_IP = "localhost:61618";
+ private static final String DC2_BACKUP_IP = "localhost:61619";
+
+ private static String uri(String ip) {
+ return "tcp://" + ip;
+ }
+
+ private static String uriWithAlternate(String ip, String alternate) {
+ return "tcp://" + ip + "#tcp://" + alternate;
+ }
+
+ private static void createMirroredServer(String serverName,
+ String connectionName,
+ String mirrorURI,
+ int portOffset,
+ boolean replicated,
+ String clusterStatic) throws
Exception {
+ File serverLocation = getFileServerLocation(serverName);
+ deleteDirectory(serverLocation);
+
+ HelperCreate cliCreateServer = new HelperCreate();
+
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
+ cliCreateServer.setNoWeb(true);
+ cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor",
"--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name",
DC1_NODE);
+ cliCreateServer.addArgs("--queues", QUEUE_NAME);
+ cliCreateServer.setPortOffset(portOffset);
+ if (replicated) {
+ cliCreateServer.setReplicated(true);
+ cliCreateServer.setStaticCluster(clusterStatic);
+ cliCreateServer.setClustered(true);
+ } else {
+ cliCreateServer.setClustered(false);
+ }
+
+ cliCreateServer.createServer();
+
+ Properties brokerProperties = new Properties();
+ brokerProperties.put("messageExpiryScanPeriod", "1000");
+ brokerProperties.put("AMQPConnections." + connectionName + ".uri",
mirrorURI);
+ brokerProperties.put("AMQPConnections." + connectionName +
".retryInterval", "1000");
+ brokerProperties.put("AMQPConnections." + connectionName + ".type",
AMQPBrokerConnectionAddressType.MIRROR.toString());
+ brokerProperties.put("AMQPConnections." + connectionName +
".connectionElements.mirror.sync", "false");
+ brokerProperties.put("largeMessageSync", "false");
+
+ brokerProperties.put("addressSettings.#.maxSizeMessages", "50000");
+ brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
+ brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
+ brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
+ // if we don't use pageTransactions we may eventually get a few
duplicates
+ brokerProperties.put("mirrorPageTransaction", "true");
+
+ File brokerPropertiesFile = new File(serverLocation,
"broker.properties");
+ saveProperties(brokerProperties, brokerPropertiesFile);
+
+ if (TRACE_LOGS) {
+ replaceLogs(serverLocation);
+ }
+
+ }
+
+ private static void replaceLogs(File serverLocation) throws Exception {
+ File log4j = new File(serverLocation, "/etc/log4j2.properties");
+ assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO",
+ "logger.artemis_utils.level=INFO\n" +
"\n" +
+
"logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n"
+
+ "logger.endpoint.level=DEBUG\n" +
+
"logger.ackmanager.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager\n"
+
+ "logger.ackmanager.level=INFO\n" +
+
+
"logger.mirrorTarget.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget\n"
+
+ "logger.mirrorTarget.level=INFO\n" +
+
+ "appender.console.filter.threshold.type
= ThresholdFilter\n" +
+ "appender.console.filter.threshold.level
= trace"));
+ }
+
+ private static void createMirroredBackupServer(String serverName,
+ int portOffset,
+ String clusterStatic,
+ String mirrorURI) throws
Exception {
+ File serverLocation = getFileServerLocation(serverName);
+ deleteDirectory(serverLocation);
+
+ HelperCreate cliCreateServer = new HelperCreate();
+
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
+ cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
+ cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor",
"--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name",
DC1_NODE);
+ cliCreateServer.setPortOffset(portOffset);
+ cliCreateServer.setClustered(true);
+ cliCreateServer.setReplicated(true);
+ cliCreateServer.setBackup(true);
+ cliCreateServer.setStaticCluster(clusterStatic);
+ cliCreateServer.createServer();
+
+ Properties brokerProperties = new Properties();
+ brokerProperties.put("messageExpiryScanPeriod", "1000");
+ brokerProperties.put("AMQPConnections.mirror.uri", mirrorURI);
+ brokerProperties.put("AMQPConnections.mirror.retryInterval", "1000");
+ brokerProperties.put("AMQPConnections.mirror.type",
AMQPBrokerConnectionAddressType.MIRROR.toString());
+
brokerProperties.put("AMQPConnections.mirror.connectionElements.mirror.sync",
"false");
+ brokerProperties.put("largeMessageSync", "false");
+
+ brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
+ brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
+ brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
+ brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
+
+ // if we don't use pageTransactions we may eventually get a few
duplicates
+ brokerProperties.put("mirrorPageTransaction", "true");
+ File brokerPropertiesFile = new File(serverLocation,
"broker.properties");
+ saveProperties(brokerProperties, brokerPropertiesFile);
+
+ File brokerXml = new File(serverLocation, "/etc/broker.xml");
+ assertTrue(brokerXml.exists());
+ // Adding redistribution delay to broker configuration
+ assertTrue(FileUtil.findReplace(brokerXml, "<address-setting
match=\"#\">", "<address-setting match=\"#\">\n\n" + "
<redistribution-delay>0</redistribution-delay> <!
Issue Time Tracking
-------------------
Worklog Id: (was: 932375)
Time Spent: 5h 20m (was: 5h 10m)
> Add configuration option to relax syncs journal replication for Mirror Target
> -----------------------------------------------------------------------------
>
> Key: ARTEMIS-5001
> URL: https://issues.apache.org/jira/browse/ARTEMIS-5001
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Affects Versions: 2.37.0
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.38.0
>
> Time Spent: 5h 20m
> Remaining Estimate: 0h
>
> When I worked on AMQP Mirror I did not actually envision being used with
> journal replication. I actually thought more about adding multiple mirrored
> options instead.
> However an user reported me that when using mirror and journal replication
> combined, the sends could take a lot longer to happen (some normal latency)
> and the acks would eventually be missed.
> I should add an option to ignore the replication for the Mirror Target.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact