gemmellr commented on code in PR #5172:
URL: https://github.com/apache/activemq-artemis/pull/5172#discussion_r1736009266


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java:
##########
@@ -91,8 +92,8 @@ public AMQPSessionContext getSessionContext() {
       return protonSession;
    }
 
-   protected void recoverContext() {
-      sessionSPI.recoverContext();
+   protected OperationContext recoverContext() {

Review Comment:
   Think this could use the matching javadoc to describe that it is really the 
_current/old_ context being returned, not the 'recovered' one that will be used 
afterwards.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -377,6 +428,30 @@ public String toString() {
       }
    }
 
+
+   static final class IgnoreReplicationTaskHolder {
+      @Override
+      public String toString() {
+         return "TaskHolder [storeLined=" + storeLined +

Review Comment:
   c&p mismatch of class name



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -419,21 +494,17 @@ public boolean waitCompletion(final long timeout) throws 
InterruptedException, A
 
    @Override
    public String toString() {
-      return "OperationContextImpl [" + hashCode() + "] [minimalStore=" + 
minimalStore +
+      return "OperationContextImpl [" + hashCode() +

Review Comment:
   The hashcode used to be separated on its own, "[hashcode][others=foo]", did 
you mean to put them all in the same block?
   
   Also, would it be worth switching to System.identityHashCode(this) to make 
clearer thats what value is desired, given this doenst even implement 
hashCode().



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java:
##########
@@ -139,7 +140,14 @@ public void endRetry() {
 
       // schedule a retry
       if (!sortRetries().isEmpty()) {
-         scheduledComponent.delay();
+         ActiveMQScheduledComponent scheduleComponentReference = 
scheduledComponent;
+         if (scheduleComponentReference != null) {
+            try {
+               scheduleComponentReference.delay();
+            } catch (RejectedExecutionException thatsOK) {
+               logger.debug(thatsOK.getMessage(), thatsOK);

Review Comment:
   Logging _thatsOk_ already prints its message, so rather than simply 
duplicating it, adding a useful description message would be helpful to anyone 
that later encounters this logging and stacktrace. E.g to understand _what_ was 
rejected without having to go look up the code.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -165,56 +185,74 @@ public synchronized void replicationDone() {
 
    @Override
    public void executeOnCompletion(IOCallback runnable) {
-      executeOnCompletion(runnable, false);
+      executeOnCompletion(runnable, OperationConsistencyLevel.FULL);
    }
 
    @Override
-   public void executeOnCompletion(final IOCallback completion, final boolean 
storeOnly) {
+   public void executeOnCompletion(final IOCallback completion, final 
OperationConsistencyLevel consistencyLevel) {
       boolean executeNow = false;
 
       synchronized (this) {
          if (errorCode == -1) {
             final long storeLined = STORE_LINEUP_UPDATER.get(this);
             final long pageLined = PAGE_LINEUP_UPDATER.get(this);
             final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this);
-            if (storeOnly) {
-               if (storeOnlyTasks == null) {
-                  storeOnlyTasks = new LinkedList<>();
-               }
-            } else {
-               if (tasks == null) {
-                  tasks = new LinkedList<>();
-                  minimalReplicated = replicationLined;
-                  minimalStore = storeLined;
-                  minimalPage = pageLined;
-               }
-            }
-            // On this case, we can just execute the context directly
-
-            if (replicationLined == replicated && storeLined == stored && 
pageLined == paged) {
-               // We want to avoid the executor if everything is complete...
-               // However, we can't execute the context if there are 
executions pending
-               // We need to use the executor on this case
-               if (EXECUTORS_PENDING_UPDATER.get(this) == 0) {
-                  // No need to use an executor here or a context switch
-                  // there are no actions pending.. hence we can just execute 
the task directly on the same thread
-                  executeNow = true;
-               } else {
-                  execute(completion);
-               }
-            } else {
-               if (storeOnly) {
-                  if (storeLined == stored && 
EXECUTORS_PENDING_UPDATER.get(this) == 0) {
-                     executeNow = true;
+            switch (consistencyLevel) {
+               case STORAGE:
+                  if (storeOnlyTasks == null) {
+                     storeOnlyTasks = new LinkedList<>();
+                  }
+                  if (storeLined == stored) {
+                     if (hasNoPendingExecution()) {
+                        executeNow = true;
+                     } else {
+                        execute(completion);
+                     }
                   } else {
-                     assert !storeOnlyTasks.isEmpty() ? 
storeOnlyTasks.peekLast().storeLined <= storeLined : true;
                      storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, 
storeLined));
                   }
-               } else {
-                  // ensure total ordering
-                  assert validateTasksAdd(storeLined, replicationLined, 
pageLined);
-                  tasks.add(new TaskHolder(completion, storeLined, 
replicationLined, pageLined));
-               }
+                  break;
+
+               case IGNORE_REPLICATION:
+                  if (ignoreReplicationTasks == null) {
+                     ignoreReplicationTasks = new LinkedList<>();
+                  }
+
+                  if (storeLined == stored && pageLined == paged) {
+                     if (hasNoPendingExecution()) {
+                        // No need to use an executor here or a context switch
+                        // there are no actions pending.. hence we can just 
execute the task directly on the same thread

Review Comment:
   Bumping as the UI is hiding the thread. As before, either this comment isnt 
important enough to be needed at all, or it should really be on the _first_ 
instance this occurs in the method as well / instead (16 lines earlier).



##########
tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirrorInfiniteRetryReplicaTest.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.brokerConnection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MirrorInfiniteRetryReplicaTest extends SmokeTestBase {
+
+   private static final String QUEUE_NAME = 
"MirrorInfiniteRetryReplicaTestQueue";
+
+   public static final String DC1_NODE = "MirrorInfiniteRetryReplicaTest/DC1";
+   public static final String DC2_NODE = "MirrorInfiniteRetryReplicaTest/DC2";
+   public static final String DC2_REPLICA_NODE = 
"MirrorInfiniteRetryReplicaTest/DC2_REPLICA";
+   public static final String DC1_REPLICA_NODE = 
"MirrorInfiniteRetryReplicaTest/DC1_REPLICA";
+
+   volatile Process processDC1;
+   volatile Process processDC2;
+   volatile Process processDC1_REPLICA;
+   volatile Process processDC2_REPLICA;
+
+
+   // change this to true to have the server producing more detailed logs
+   private static final boolean TRACE_LOGS = false;
+
+   @AfterEach
+   public void destroyServers() throws Exception {
+      if (processDC2_REPLICA != null) {
+         processDC2_REPLICA.destroyForcibly();
+         processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES);
+         processDC2_REPLICA = null;
+      }
+      if (processDC1_REPLICA != null) {
+         processDC1_REPLICA.destroyForcibly();
+         processDC1_REPLICA.waitFor(1, TimeUnit.MINUTES);
+         processDC1_REPLICA = null;
+      }
+      if (processDC1 != null) {
+         processDC1.destroyForcibly();
+         processDC1.waitFor(1, TimeUnit.MINUTES);
+         processDC1 = null;
+      }
+      if (processDC2 != null) {
+         processDC2.destroyForcibly();
+         processDC2.waitFor(1, TimeUnit.MINUTES);
+         processDC2 = null;
+      }
+   }
+
+   private static final String DC1_IP = "localhost:61616";
+   private static final String DC1_BACKUP_IP = "localhost:61617";
+   private static final String DC2_IP = "localhost:61618";
+   private static final String DC2_BACKUP_IP = "localhost:61619";
+
+   private static String uri(String ip) {
+      return "tcp://" + ip;
+   }
+
+   private static String uriWithAlternate(String ip, String alternate) {
+      return "tcp://" + ip + "#tcp://" + alternate;
+   }
+
+   private static void createMirroredServer(String serverName,
+                                            String connectionName,
+                                            String mirrorURI,
+                                            int portOffset,
+                                            boolean replicated,
+                                            String clusterStatic) throws 
Exception {
+      File serverLocation = getFileServerLocation(serverName);
+      deleteDirectory(serverLocation);
+
+      HelperCreate cliCreateServer = new HelperCreate();
+      
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
+      cliCreateServer.setNoWeb(true);
+      cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", 
"--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", 
DC1_NODE);
+      cliCreateServer.addArgs("--queues", QUEUE_NAME);
+      cliCreateServer.setPortOffset(portOffset);
+      if (replicated) {
+         cliCreateServer.setReplicated(true);
+         cliCreateServer.setStaticCluster(clusterStatic);
+         cliCreateServer.setClustered(true);
+      } else {
+         cliCreateServer.setClustered(false);
+      }
+
+      cliCreateServer.createServer();
+
+      Properties brokerProperties = new Properties();
+      brokerProperties.put("messageExpiryScanPeriod", "1000");
+      brokerProperties.put("AMQPConnections." + connectionName + ".uri", 
mirrorURI);
+      brokerProperties.put("AMQPConnections." + connectionName + 
".retryInterval", "1000");
+      brokerProperties.put("AMQPConnections." + connectionName + ".type", 
AMQPBrokerConnectionAddressType.MIRROR.toString());
+      brokerProperties.put("AMQPConnections." + connectionName + 
".connectionElements.mirror.sync", "false");
+      brokerProperties.put("largeMessageSync", "false");
+
+      brokerProperties.put("addressSettings.#.maxSizeMessages", "50000");
+      brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
+      brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
+      brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
+      // if we don't use pageTransactions we may eventually get a few 
duplicates
+      brokerProperties.put("mirrorPageTransaction", "true");
+
+      File brokerPropertiesFile = new File(serverLocation, 
"broker.properties");
+      saveProperties(brokerProperties, brokerPropertiesFile);
+
+      if (TRACE_LOGS) {
+         replaceLogs(serverLocation);
+      }
+
+   }
+
+   private static void replaceLogs(File serverLocation) throws Exception {
+      File log4j = new File(serverLocation, "/etc/log4j2.properties");
+      assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO",
+                                      "logger.artemis_utils.level=INFO\n" + 
"\n" +
+                                      
"logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n"
 +
+                                      "logger.endpoint.level=DEBUG\n" +
+                                      
"logger.ackmanager.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager\n"
 +
+                                      "logger.ackmanager.level=INFO\n" +
+
+                                      
"logger.mirrorTarget.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget\n"
 +
+                                      "logger.mirrorTarget.level=INFO\n" +
+
+                                      "appender.console.filter.threshold.type 
= ThresholdFilter\n" +
+                                      "appender.console.filter.threshold.level 
= trace"));
+   }
+
+   private static void createMirroredBackupServer(String serverName,
+                                                  int portOffset,
+                                                  String clusterStatic,
+                                                  String mirrorURI) throws 
Exception {
+      File serverLocation = getFileServerLocation(serverName);
+      deleteDirectory(serverLocation);
+
+      HelperCreate cliCreateServer = new HelperCreate();
+      
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
+      cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
+      cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", 
"--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", 
DC1_NODE);
+      cliCreateServer.setPortOffset(portOffset);
+      cliCreateServer.setClustered(true);
+      cliCreateServer.setReplicated(true);
+      cliCreateServer.setBackup(true);
+      cliCreateServer.setStaticCluster(clusterStatic);
+      cliCreateServer.createServer();
+
+      Properties brokerProperties = new Properties();
+      brokerProperties.put("messageExpiryScanPeriod", "1000");
+      brokerProperties.put("AMQPConnections.mirror.uri", mirrorURI);
+      brokerProperties.put("AMQPConnections.mirror.retryInterval", "1000");
+      brokerProperties.put("AMQPConnections.mirror.type", 
AMQPBrokerConnectionAddressType.MIRROR.toString());
+      
brokerProperties.put("AMQPConnections.mirror.connectionElements.mirror.sync", 
"false");
+      brokerProperties.put("largeMessageSync", "false");
+
+      brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
+      brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
+      brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
+      brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
+
+      // if we don't use pageTransactions we may eventually get a few 
duplicates
+      brokerProperties.put("mirrorPageTransaction", "true");
+      File brokerPropertiesFile = new File(serverLocation, 
"broker.properties");
+      saveProperties(brokerProperties, brokerPropertiesFile);
+
+      File brokerXml = new File(serverLocation, "/etc/broker.xml");
+      assertTrue(brokerXml.exists());
+      // Adding redistribution delay to broker configuration
+      assertTrue(FileUtil.findReplace(brokerXml, "<address-setting 
match=\"#\">", "<address-setting match=\"#\">\n\n" + "            
<redistribution-delay>0</redistribution-delay> <!-- added by 
SimpleMirrorSoakTest.java --> \n"));
+      assertTrue(FileUtil.findReplace(brokerXml, 
"<page-size-bytes>10M</page-size-bytes>", 
"<page-size-bytes>100K</page-size-bytes>"));
+
+      if (TRACE_LOGS) {
+         replaceLogs(serverLocation);
+      }
+   }
+
+   public static void createRealServers() throws Exception {
+      createMirroredServer(DC1_NODE, "mirror", uriWithAlternate(DC2_IP, 
DC2_BACKUP_IP), 0, true, uri(DC1_BACKUP_IP));
+      createMirroredBackupServer(DC1_REPLICA_NODE, 1, uri(DC1_IP), 
uriWithAlternate(DC2_IP, DC2_BACKUP_IP));
+      createMirroredServer(DC2_NODE, "mirror", uriWithAlternate(DC1_IP, 
DC1_BACKUP_IP), 2, true, uri(DC2_BACKUP_IP));
+      createMirroredBackupServer(DC2_REPLICA_NODE, 3, uri(DC2_IP), 
uriWithAlternate(DC1_IP, DC1_BACKUP_IP));
+   }
+
+   @Test
+   public void testConsumersAttached() throws Exception {
+      createRealServers();
+
+      SimpleManagement managementDC1 = new SimpleManagement(uri(DC1_IP), null, 
null);
+      SimpleManagement managementDC2 = new SimpleManagement(uri(DC2_IP), null, 
null);
+
+      processDC2 = startServer(DC2_NODE, -1, -1, new 
File(getServerLocation(DC2_NODE), "broker.properties"));
+      processDC2_REPLICA = startServer(DC2_REPLICA_NODE, -1, -1, new 
File(getServerLocation(DC2_REPLICA_NODE), "broker.properties"));
+
+      processDC1 = startServer(DC1_NODE, -1, -1, new 
File(getServerLocation(DC1_NODE), "broker.properties"));
+      processDC1_REPLICA = startServer(DC1_REPLICA_NODE, -1, -1, new 
File(getServerLocation(DC1_REPLICA_NODE), "broker.properties"));
+
+      ServerUtil.waitForServerToStart(2, 10_000);
+      Wait.assertTrue(managementDC2::isReplicaSync);
+
+      ServerUtil.waitForServerToStart(0, 10_000);
+      Wait.assertTrue(managementDC1::isReplicaSync);
+
+      runAfter(() -> managementDC1.close());
+      runAfter(() -> managementDC2.close());
+
+      ConnectionFactory connectionFactoryDC1A = 
CFUtil.createConnectionFactory("amqp", uri(DC1_IP));
+      try (Connection connection = connectionFactoryDC1A.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+         TextMessage message = session.createTextMessage("Simple message");
+         message.setIntProperty("i", 1);
+         message.setBooleanProperty("large", false);
+         producer.send(message);
+         session.commit();
+      }
+
+      ConnectionFactory connectionFactoryDC2A = 
CFUtil.createConnectionFactory("amqp", uri(DC2_IP));
+      try (Connection connectionDC2 = 
connectionFactoryDC2A.createConnection(); Connection connectionDC1 = 
connectionFactoryDC1A.createConnection()) {
+         connectionDC2.start();
+         connectionDC1.start();
+
+         // we will receive the message and hold it...
+         Session sessionDC2 = connectionDC2.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue = sessionDC2.createQueue(QUEUE_NAME);
+         MessageConsumer consumerDC2 = sessionDC2.createConsumer(queue);
+         assertNotNull(consumerDC2.receive(5000));
+
+         Session sessionDC1 = connectionDC1.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageConsumer consumerDC1 = sessionDC1.createConsumer(queue);
+         assertNotNull(consumerDC1.receive(5000));
+         sessionDC1.commit();
+
+         assertEquals(1, managementDC2.getMessageCountOnQueue(QUEUE_NAME));
+
+         // we roll it back and close the consumer, the message should now be 
back to be retried correctly
+         sessionDC2.rollback();
+         consumerDC2.close();
+         Wait.assertEquals(0, () -> 
managementDC2.getDeliveringCountOnQueue(QUEUE_NAME));

Review Comment:
   Shouldn't it check the message count as well?



##########
tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirrorInfiniteRetryReplicaTest.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.brokerConnection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MirrorInfiniteRetryReplicaTest extends SmokeTestBase {

Review Comment:
   Is this name still appropriate now?
   
   Either way, a small description of the test seems in order, it doesnt seem 
like it will be particularly obvious to anyone later from just the naming what 
MirrorInfiniteRetryReplicaTest.testConsumersAttached() is actually testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org
For additional commands, e-mail: gitbox-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to