[
https://issues.apache.org/jira/browse/ARTEMIS-4136?focusedWorklogId=840136&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-840136
]
ASF GitHub Bot logged work on ARTEMIS-4136:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 19/Jan/23 01:30
Start Date: 19/Jan/23 01:30
Worklog Time Spent: 10m
Work Description: clebertsuconic commented on code in PR #4338:
URL: https://github.com/apache/activemq-artemis/pull/4338#discussion_r1080725455
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
+
+ Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class);
+
+ private static final String SLOW_SERVER_NAME = "slow";
+ private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+ private ActiveMQServer slowServer;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP,OPENWIRE,CORE";
+ }
+
+ @Test
+ public void testPersistedSendAMQP() throws Exception {
+ testPersistedSend("AMQP", false, 100);
+ }
+
+ @Test
+ public void testPersistedSendAMQPLarge() throws Exception {
+ testPersistedSend("AMQP", false, 200 * 1024);
+ }
+
+
+ @Test
+ public void testPersistedSendCore() throws Exception {
+ testPersistedSend("CORE", false, 100);
+ }
+
+ @Test
+ public void testPersistedSendCoreLarge() throws Exception {
+ testPersistedSend("CORE", false, 200 * 1024);
+ }
+
+ @Test
+ public void testPersistedSendAMQPTXLarge() throws Exception {
+ testPersistedSend("AMQP", true, 200 * 1024);
+ }
+
+ @Test
+ public void testPersistedSendAMQPTX() throws Exception {
+ testPersistedSend("AMQP", true, 100);
+ }
+
+ @Test
+ public void testPersistedSendCoreTX() throws Exception {
+ testPersistedSend("CORE", true, 100);
+ }
+
+ @Test
+ public void testPersistedSendCoreTXLarge() throws Exception {
+ testPersistedSend("CORE", true, 200 * 1024);
+ }
+
+ private void testPersistedSend(String protocol, boolean transactional, int
messageSize) throws Exception {
+ ReusableLatch sendPending = new ReusableLatch(0);
+ Semaphore semSend = new Semaphore(1);
+ Semaphore semAck = new Semaphore(1);
+ AtomicInteger errors = new AtomicInteger(0);
+
+ try {
+ final int NUMBER_OF_MESSAGES = 10;
+
+ AtomicInteger countStored = new AtomicInteger(0);
+
+ slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT,
SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("StorageCallback::slow isUpdate={}, isTX={},
txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType,
record);
+ }
+ if (transactional) {
+ if (isTX) {
+ try {
+ if (countStored.get() > 0) {
+ countStored.incrementAndGet();
+ logger.trace("semSend.tryAcquire");
+ if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+ logger.trace("acquired TX, now release");
+ semSend.release();
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ if (recordType == JournalRecordIds.ACKNOWLEDGE_REF) {
+ logger.debug("slow ACK REF");
+ try {
+ if (semAck.tryAcquire(20, TimeUnit.SECONDS)) {
+ semAck.release();
+ logger.trace("slow acquired ACK semaphore");
+ } else {
+ logger.trace("Semaphore wasn't acquired");
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ if (recordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
+ try {
+ countStored.incrementAndGet();
+ if (!transactional) {
+ logger.trace("semSend.tryAcquire");
+ if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+ logger.trace("acquired non TX now release");
+ semSend.release();
+ }
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ }
+ }
+ });
+ slowServer.setIdentity("slowServer");
+ server.setIdentity("server");
+
+ ExecutorService pool = Executors.newFixedThreadPool(5);
+ runAfter(pool::shutdown);
+
+ AMQPMirrorBrokerConnectionElement replication =
configureMirrorTowardsSlow(server);
+
+ slowServer.getConfiguration().setName("slow");
+ server.getConfiguration().setName("fast");
+ slowServer.start();
+ server.start();
+
+ waitForServerToStart(slowServer);
+ waitForServerToStart(server);
+
+ server.addAddressInfo(new
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+ server.createQueue(new
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+ Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
+ Queue replicatedQueue = slowServer.locateQueue(getQueueName());
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:" + AMQP_PORT);
+
+ if (factory instanceof ActiveMQConnectionFactory) {
+ ((ActiveMQConnectionFactory)
factory).getServerLocator().setBlockOnAcknowledge(true);
+ }
+
+ Connection connection = factory.createConnection();
+ runAfter(connection::close);
+ Session session = connection.createSession(transactional,
transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(getQueueName()));
+
+ connection.start();
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ final String bodyMessage;
+ {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < messageSize; i++) {
+ buffer.append("large Buffer...");
+ }
+ bodyMessage = buffer.toString();
+ }
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("===>>> send message {}", i);
+ }
+ int theI = i;
+ sendPending.countUp();
+ logger.trace("semSend.acquire");
+ semSend.acquire();
+ if (!transactional) {
+ pool.execute(() -> {
+ try {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Entering non TX send with sendPending =
" + sendPending.getCount());
+ }
+ TextMessage message =
session.createTextMessage(bodyMessage);
+ message.setStringProperty("strProperty", "" + theI);
+ producer.send(message);
+ sendPending.countDown();
+ if (logger.isTraceEnabled()) {
+ logger.trace("leaving non TX send with sendPending = "
+ sendPending.getCount());
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ }
+ });
+ } else {
+ CountDownLatch sendDone = new CountDownLatch(1);
+ pool.execute(() -> {
+ try {
+ TextMessage message =
session.createTextMessage(bodyMessage);
+ message.setStringProperty("strProperty", "" + theI);
+ producer.send(message);
+ } catch (Throwable e) {
+ errors.incrementAndGet();
+ logger.warn(e.getMessage(), e);
+ }
+ sendDone.countDown();
+ });
+
+ Wait.assertEquals(i, replicatedQueue::getMessageCount);
+
+ Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+ pool.execute(() -> {
+ try {
+ session.commit();
+ sendPending.countDown();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+ Assert.assertFalse("Send epending not supposed to succeed",
sendPending.await(10, TimeUnit.MILLISECONDS));
Review Comment:
thanks!
Issue Time Tracking
-------------------
Worklog Id: (was: 840136)
Time Spent: 40m (was: 0.5h)
> Mirror sync replication
> -----------------------
>
> Key: ARTEMIS-4136
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4136
> Project: ActiveMQ Artemis
> Issue Type: New Feature
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.28.0
>
> Time Spent: 40m
> Remaining Estimate: 0h
>
> I'm adding an option sync=true|false on mirror.
> It will be possible to configure a mirror as this:
> <broker-connections>
> <amqp-connection uri="tcp://test1:111" name="test1"
> retry-interval="333" reconnect-attempts="33" user="testuser"
> password="testpassword">
> <mirror sync="true"/>
> </amqp-connection
> </broker-connection>
> if sync is set to true, any client blocking operation would wait a mirror
> callback.
> With that option set, any blocking operation on the broker will wait a mirror
> roundtrip:
> tx.commit(), session.send (non transactional). client.ack (when configured as
> sync).
> Notice that in AMQP client dispositions are always asynchronous, hence it's
> only possible to sync acks if using transactional for AMQP.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)