[ 
https://issues.apache.org/jira/browse/ARTEMIS-4136?focusedWorklogId=840136&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-840136
 ]

ASF GitHub Bot logged work on ARTEMIS-4136:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Jan/23 01:30
            Start Date: 19/Jan/23 01:30
    Worklog Time Spent: 10m 
      Work Description: clebertsuconic commented on code in PR #4338:
URL: https://github.com/apache/activemq-artemis/pull/4338#discussion_r1080725455


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
+
+   Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class);
+
+   private static final String SLOW_SERVER_NAME = "slow";
+   private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+   private ActiveMQServer slowServer;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   @Test
+   public void testPersistedSendAMQP() throws Exception {
+      testPersistedSend("AMQP", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendAMQPLarge() throws Exception {
+      testPersistedSend("AMQP", false, 200 * 1024);
+   }
+
+
+   @Test
+   public void testPersistedSendCore() throws Exception {
+      testPersistedSend("CORE", false, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreLarge() throws Exception {
+      testPersistedSend("CORE", false, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTXLarge() throws Exception {
+      testPersistedSend("AMQP", true, 200 * 1024);
+   }
+
+   @Test
+   public void testPersistedSendAMQPTX() throws Exception {
+      testPersistedSend("AMQP", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTX() throws Exception {
+      testPersistedSend("CORE", true, 100);
+   }
+
+   @Test
+   public void testPersistedSendCoreTXLarge() throws Exception {
+      testPersistedSend("CORE", true, 200 * 1024);
+   }
+
+   private void testPersistedSend(String protocol, boolean transactional, int 
messageSize) throws Exception {
+      ReusableLatch sendPending = new ReusableLatch(0);
+      Semaphore semSend = new Semaphore(1);
+      Semaphore semAck = new Semaphore(1);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      try {
+         final int NUMBER_OF_MESSAGES = 10;
+
+         AtomicInteger countStored = new AtomicInteger(0);
+
+         slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT, 
SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+            if (logger.isDebugEnabled()) {
+               logger.debug("StorageCallback::slow isUpdate={}, isTX={}, 
txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType, 
record);
+            }
+            if (transactional) {
+               if (isTX) {
+                  try {
+                     if (countStored.get() > 0) {
+                        countStored.incrementAndGet();
+                        logger.trace("semSend.tryAcquire");
+                        if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                           logger.trace("acquired TX, now release");
+                           semSend.release();
+                        }
+                     }
+                  } catch (Exception e) {
+                     e.printStackTrace();
+                  }
+               }
+            }
+            if (recordType == JournalRecordIds.ACKNOWLEDGE_REF) {
+               logger.debug("slow ACK REF");
+               try {
+                  if (semAck.tryAcquire(20, TimeUnit.SECONDS)) {
+                     semAck.release();
+                     logger.trace("slow acquired ACK semaphore");
+                  } else {
+                     logger.trace("Semaphore wasn't acquired");
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+            if (recordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
+               try {
+                  countStored.incrementAndGet();
+                  if (!transactional) {
+                     logger.trace("semSend.tryAcquire");
+                     if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+                        logger.trace("acquired non TX now release");
+                        semSend.release();
+                     }
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+                  errors.incrementAndGet();
+               }
+            }
+         });
+         slowServer.setIdentity("slowServer");
+         server.setIdentity("server");
+
+         ExecutorService pool = Executors.newFixedThreadPool(5);
+         runAfter(pool::shutdown);
+
+         AMQPMirrorBrokerConnectionElement replication = 
configureMirrorTowardsSlow(server);
+
+         slowServer.getConfiguration().setName("slow");
+         server.getConfiguration().setName("fast");
+         slowServer.start();
+         server.start();
+
+         waitForServerToStart(slowServer);
+         waitForServerToStart(server);
+
+         server.addAddressInfo(new 
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+         server.createQueue(new 
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+         Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
+         Queue replicatedQueue = slowServer.locateQueue(getQueueName());
+
+         ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + AMQP_PORT);
+
+         if (factory instanceof ActiveMQConnectionFactory) {
+            ((ActiveMQConnectionFactory) 
factory).getServerLocator().setBlockOnAcknowledge(true);
+         }
+
+         Connection connection = factory.createConnection();
+         runAfter(connection::close);
+         Session session = connection.createSession(transactional, 
transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+
+         connection.start();
+
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         final String bodyMessage;
+         {
+            StringBuffer buffer = new StringBuffer();
+            for (int i = 0; i < messageSize; i++) {
+               buffer.append("large Buffer...");
+            }
+            bodyMessage = buffer.toString();
+         }
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            if (logger.isTraceEnabled()) {
+               logger.trace("===>>> send message {}", i);
+            }
+            int theI = i;
+            sendPending.countUp();
+            logger.trace("semSend.acquire");
+            semSend.acquire();
+            if (!transactional) {
+               pool.execute(() -> {
+                  try {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Entering non TX send with sendPending = 
" + sendPending.getCount());
+                     }
+                     TextMessage message = 
session.createTextMessage(bodyMessage);
+                     message.setStringProperty("strProperty", "" + theI);
+                     producer.send(message);
+                     sendPending.countDown();
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("leaving non TX send with sendPending = " 
+ sendPending.getCount());
+                     }
+                  } catch (Throwable e) {
+                     logger.warn(e.getMessage(), e);
+                     errors.incrementAndGet();
+                  }
+               });
+            } else {
+               CountDownLatch sendDone = new CountDownLatch(1);
+               pool.execute(() -> {
+                  try {
+                     TextMessage message = 
session.createTextMessage(bodyMessage);
+                     message.setStringProperty("strProperty", "" + theI);
+                     producer.send(message);
+                  } catch (Throwable e) {
+                     errors.incrementAndGet();
+                     logger.warn(e.getMessage(), e);
+                  }
+                  sendDone.countDown();
+               });
+
+               Wait.assertEquals(i, replicatedQueue::getMessageCount);
+
+               Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+               pool.execute(() -> {
+                  try {
+                     session.commit();
+                     sendPending.countDown();
+                  } catch (Throwable e) {
+                     e.printStackTrace();
+                  }
+               });
+            }
+
+            Assert.assertFalse("Send epending not supposed to succeed", 
sendPending.await(10, TimeUnit.MILLISECONDS));

Review Comment:
   thanks!





Issue Time Tracking
-------------------

    Worklog Id:     (was: 840136)
    Time Spent: 40m  (was: 0.5h)

> Mirror sync replication
> -----------------------
>
>                 Key: ARTEMIS-4136
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4136
>             Project: ActiveMQ Artemis
>          Issue Type: New Feature
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.28.0
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> I'm adding an option sync=true|false on mirror.
> It will be possible to configure a mirror as this:
>      <broker-connections>
>          <amqp-connection uri="tcp://test1:111" name="test1" 
> retry-interval="333" reconnect-attempts="33" user="testuser" 
> password="testpassword">
>             <mirror sync="true"/>
>        </amqp-connection
>    </broker-connection>
> if sync is set to true, any client blocking operation would wait a mirror 
> callback.
> With that option set, any blocking operation on the broker will wait a mirror 
> roundtrip:
> tx.commit(), session.send (non transactional). client.ack (when configured as 
> sync).
> Notice that in AMQP client dispositions are always asynchronous, hence it's 
> only possible to sync acks if using transactional for AMQP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to