lucastetreault commented on code in PR #848:
URL: https://github.com/apache/activemq/pull/848#discussion_r1062161446
##########
activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java:
##########
@@ -1296,9 +1297,36 @@ public QueueMessageReference getMessage(String id) {
return null;
}
+ public List<MessageId> getAllMessageIds() throws Exception {
Review Comment:
Seems like there is a risk of out of memory exception if you're going to
load all the messages from a queue in to memory?
##########
activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java:
##########
@@ -412,5 +412,5 @@ public interface Broker extends Region, Service {
void networkBridgeStopped(BrokerInfo brokerInfo);
-
+ void queuePurged(ConnectionContext context, ActiveMQDestination
destination);
Review Comment:
The verb makes me thing this should be an event listener as opposed to a
method we call everywhere. Will check how this gets used later.
##########
activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java:
##########
@@ -245,7 +245,7 @@ public String getUserName() {
return userName;
}
- protected void setUserName(String userName) {
+ public void setUserName(String userName) {
Review Comment:
Interested to see why this needs to be public. Will try to remember to come
back.
##########
activemq-broker/src/main/java/org/apache/activemq/replica/PeriodAcknowledge.java:
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PeriodAcknowledge {
Review Comment:
Rename to PeriodicAcknowledge?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/PeriodAcknowledge.java:
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PeriodAcknowledge {
+
+ private static final int MAX_ACK_BATCH_SIZE = 100;
Review Comment:
Should this be configurable?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/PeriodAcknowledge.java:
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PeriodAcknowledge {
+
+ private static final int MAX_ACK_BATCH_SIZE = 100;
+ private boolean safeToAck = true;
+ private final AtomicLong lastAckTime = new AtomicLong();
+ private final AtomicInteger pendingAckCount = new AtomicInteger();
+ private final AtomicReference<ActiveMQConnection> connection = new
AtomicReference<>();
+ private final AtomicReference<ActiveMQSession> connectionSession = new
AtomicReference<>();
+ private final long replicaAckPeriod;
+ private final Object periodicCommitLock = new Object();
+
+
+ public PeriodAcknowledge(long replicaAckPeriod) {
+ this.replicaAckPeriod = replicaAckPeriod;
+ }
+
+ public void setConnection(ActiveMQConnection activeMQConnection) {
+ connection.set(activeMQConnection);
+ }
+
+ public void setConnectionSession(ActiveMQSession activeMQSession) {
+ connectionSession.set(activeMQSession);
+ }
+
+ public void setSafeToAck(boolean safeToAck) {
+ this.safeToAck = safeToAck;
+ }
+
+ private boolean shouldPeriodicallyCommit() {
Review Comment:
Class is already called PeriodAcknowledge and I think it's actually about
acknowledging so maybe rename to `shouldAcknowledge`?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBatcher.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+
+import javax.jms.JMSException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ReplicaBatcher {
+
+ static final int MAX_BATCH_LENGTH = 500;
+ static final int MAX_BATCH_SIZE = 5_000_000; // 5 Mb
+
+ @SuppressWarnings("unchecked")
+ static List<List<MessageReference>> batches(List<MessageReference> list)
throws Exception {
+ List<List<MessageReference>> result = new ArrayList<>();
+
+ Map<String, Set<String>> destination2eventType = new HashMap<>();
+ List<MessageReference> batch = new ArrayList<>();
+ int batchSize = 0;
+ for (MessageReference reference : list) {
+ ActiveMQMessage message = (ActiveMQMessage) reference.getMessage();
+ String originalDestination =
message.getStringProperty(ReplicaSupport.ORIGINAL_MESSAGE_DESTINATION_PROPERTY);
+ ReplicaEventType currentEventType =
+
ReplicaEventType.valueOf(message.getStringProperty(ReplicaEventType.EVENT_TYPE_PROPERTY));
+
+ boolean eventTypeSwitch = false;
+ if (originalDestination != null) {
+ Set<String> sends =
destination2eventType.computeIfAbsent(originalDestination, k -> new
HashSet<>());
+ if (currentEventType == ReplicaEventType.MESSAGE_SEND) {
+
sends.add(message.getStringProperty(ReplicaSupport.MESSAGE_ID_PROPERTY));
+ }
+ if (currentEventType == ReplicaEventType.MESSAGE_ACK) {
+ List<String> stringProperty = (List<String>)
message.getProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY);
+ if (sends.stream().anyMatch(stringProperty::contains)) {
Review Comment:
Could be expensive for many sends and many acks in the stringProperty. Is it
worth converting stringProperty to a set?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBatcher.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+
+import javax.jms.JMSException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ReplicaBatcher {
+
+ static final int MAX_BATCH_LENGTH = 500;
Review Comment:
Should these be configurable?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/PeriodAcknowledge.java:
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PeriodAcknowledge {
+
+ private static final int MAX_ACK_BATCH_SIZE = 100;
+ private boolean safeToAck = true;
+ private final AtomicLong lastAckTime = new AtomicLong();
+ private final AtomicInteger pendingAckCount = new AtomicInteger();
+ private final AtomicReference<ActiveMQConnection> connection = new
AtomicReference<>();
+ private final AtomicReference<ActiveMQSession> connectionSession = new
AtomicReference<>();
+ private final long replicaAckPeriod;
+ private final Object periodicCommitLock = new Object();
+
+
+ public PeriodAcknowledge(long replicaAckPeriod) {
+ this.replicaAckPeriod = replicaAckPeriod;
+ }
+
+ public void setConnection(ActiveMQConnection activeMQConnection) {
+ connection.set(activeMQConnection);
+ }
+
+ public void setConnectionSession(ActiveMQSession activeMQSession) {
+ connectionSession.set(activeMQSession);
+ }
+
+ public void setSafeToAck(boolean safeToAck) {
+ this.safeToAck = safeToAck;
+ }
+
+ private boolean shouldPeriodicallyCommit() {
+ return System.currentTimeMillis() - lastAckTime.get() >=
replicaAckPeriod;
+ }
+
+ private boolean reachedMaxAckBatchSize() {
+ return pendingAckCount.incrementAndGet() >= MAX_ACK_BATCH_SIZE;
Review Comment:
Is the increment an intended side-effect of this method?
##########
activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java:
##########
@@ -2347,6 +2436,25 @@ public void
processDispatchNotification(MessageDispatchNotification messageDispa
Subscription sub =
getMatchingSubscription(messageDispatchNotification);
if (sub != null) {
MessageReference message =
getMatchingMessage(messageDispatchNotification);
+
+ pagedInMessagesLock.writeLock().lock();
Review Comment:
Is this necessary if the replication plugin isn't enabled?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBroker.java:
##########
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageDispatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import java.lang.reflect.Method;
+import java.text.MessageFormat;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Objects.requireNonNull;
+
+public class ReplicaBroker extends BrokerFilter {
+
+ private final static long REPLICA_ACK_PERIOD = 5_000;
Review Comment:
Make configurable?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ReplicaCompactor {
+ private static final Logger logger =
LoggerFactory.getLogger(ReplicaCompactor.class);
+ private static final String CONSUMER_SELECTOR = String.format("%s LIKE
'%s'", ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_ACK);
+ public static final int MAXIMUM_MESSAGES = 1_000;
+
+ private final Broker broker;
+ private final ConnectionContext connectionContext;
+ private final ReplicaReplicationQueueSupplier queueProvider;
+ private final PrefetchSubscription subscription;
+
+ private final Queue intermediateQueue;
+
+ public ReplicaCompactor(Broker broker, ConnectionContext
connectionContext, ReplicaReplicationQueueSupplier queueProvider,
PrefetchSubscription subscription) {
+ this.broker = broker;
+ this.connectionContext = connectionContext;
+ this.queueProvider = queueProvider;
+ this.subscription = subscription;
+
+ intermediateQueue =
broker.getDestinations(queueProvider.getIntermediateQueue()).stream().findFirst()
+ .map(DestinationExtractor::extractQueue).orElseThrow();
+ }
+
+ List<MessageReference> compactAndFilter(List<MessageReference> list,
boolean withAdditionalMessages) throws Exception {
+ List<DeliveredMessageReference> toProcess = list.stream()
+ .map(DeliveredMessageReference::new)
+ .collect(Collectors.toList());
+
+ int prefetchSize = subscription.getPrefetchSize();
+ try {
+ if (withAdditionalMessages) {
+ subscription.setPrefetchSize(0);
+ toProcess.addAll(getAdditionalMessages());
+ }
+
+ List<DeliveredMessageReference> processed =
compactAndFilter0(toProcess);
+
+ Set<MessageId> messageIds =
list.stream().map(MessageReference::getMessageId).collect(Collectors.toSet());
+
+ return processed.stream()
+ .map(dmr -> dmr.messageReference)
+ .filter(mr -> messageIds.contains(mr.getMessageId()))
+ .collect(Collectors.toList());
+ } finally {
+ subscription.setPrefetchSize(prefetchSize);
+ }
+ }
+
+ private List<DeliveredMessageReference> getAdditionalMessages() throws
Exception {
+ List<DeliveredMessageReference> result = new ArrayList<>();
+ List<QueueMessageReference> additionalMessages =
intermediateQueue.getMatchingMessages(connectionContext, CONSUMER_SELECTOR,
MAXIMUM_MESSAGES);
+ if (additionalMessages.isEmpty()) {
+ return result;
+ }
+
+ String selector = String.format("%s IN %s",
ReplicaSupport.MESSAGE_ID_PROPERTY, getAckedMessageIds(additionalMessages));
+
additionalMessages.addAll(intermediateQueue.getMatchingMessages(connectionContext,
selector, MAXIMUM_MESSAGES));
+
+ Set<MessageId> dispatchedMessageIds =
subscription.getDispatched().stream()
+ .map(MessageReference::getMessageId)
+ .collect(Collectors.toSet());
+
+ for (MessageReference messageReference : additionalMessages) {
+ if
(!dispatchedMessageIds.contains(messageReference.getMessageId())) {
+ result.add(new DeliveredMessageReference(messageReference,
false));
+ }
+ }
+
+ return result;
+ }
+
+ private List<DeliveredMessageReference>
compactAndFilter0(List<DeliveredMessageReference> list) throws Exception {
Review Comment:
Just call it `compactAndFilter` since the signature is different?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java:
##########
@@ -0,0 +1,482 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.MessageReferenceFilter;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.util.Objects.requireNonNull;
+
+public class ReplicaBrokerEventListener implements MessageListener {
+
+ private static final String REPLICATION_CONSUMER_CLIENT_ID =
"DUMMY_REPLICATION_CONSUMER";
+ private static final String SEQUENCE_NAME = "replicaSeq";
+ private final Logger logger =
LoggerFactory.getLogger(ReplicaBrokerEventListener.class);
+ private final ReplicaEventSerializer eventSerializer = new
ReplicaEventSerializer();
+ private final Broker broker;
+ private final ConnectionContext connectionContext;
+ private final ReplicaInternalMessageProducer
replicaInternalMessageProducer;
+
+ private final PeriodAcknowledge acknowledgeCallback;
+ final ReplicaSequenceStorage sequenceStorage;
+ BigInteger sequence;
+ MessageId sequenceMessageId;
+
+ ReplicaBrokerEventListener(Broker broker, ReplicaReplicationQueueSupplier
queueProvider, PeriodAcknowledge acknowledgeCallback) {
+ this.broker = requireNonNull(broker);
+ this.acknowledgeCallback = requireNonNull(acknowledgeCallback);
+ connectionContext = broker.getAdminConnectionContext().copy();
+
connectionContext.setUserName(ReplicaSupport.REPLICATION_PLUGIN_USER_NAME);
+ connectionContext.setClientId(REPLICATION_CONSUMER_CLIENT_ID);
+ connectionContext.setConnection(new DummyConnection());
+ replicaInternalMessageProducer = new
ReplicaInternalMessageProducer(broker, connectionContext);
+
+ createTransactionMapIfNotExist();
+
+ this.sequenceStorage = new ReplicaSequenceStorage(broker,
connectionContext,
+ queueProvider, replicaInternalMessageProducer, SEQUENCE_NAME);
+ }
+
+ public void initialize() throws Exception {
+ String savedSequence = sequenceStorage.initialize();
+ sequence = savedSequence == null ? null : new
BigInteger(savedSequence);
+ }
+
+ @Override
+ public void onMessage(Message jmsMessage) {
+ logger.trace("Received replication message from replica source");
+ ActiveMQMessage message = (ActiveMQMessage) jmsMessage;
+
+ processMessageWithRetries(message, null);
+ }
+
+ private synchronized void processMessageWithRetries(ActiveMQMessage
message, TransactionId transactionId) {
+ new ReplicaEventRetrier(() -> {
+ boolean commit = false;
+ TransactionId tid = transactionId;
+ if (tid == null) {
+ tid = new LocalTransactionId(
+ new
ConnectionId(ReplicaSupport.REPLICATION_PLUGIN_CONNECTION_ID),
+
ReplicaSupport.LOCAL_TRANSACTION_ID_GENERATOR.getNextSequenceId());
+
+ broker.beginTransaction(connectionContext, tid);
+
+ commit = true;
+ }
+
+ try {
+ ReplicaEventType eventType = getEventType(message);
+ if (eventType == ReplicaEventType.BATCH) {
+ processBatch(message, tid);
+ } else {
+ processMessage(message, eventType, tid);
+ }
+
+ if (commit) {
+ sequenceStorage.enqueue(tid, sequence.toString());
+
+ broker.commitTransaction(connectionContext, tid, true);
+ acknowledgeCallback.setSafeToAck(true);
+ }
+ } catch (Exception e) {
+ if (commit) {
+ broker.rollbackTransaction(connectionContext, tid);
+ }
+ acknowledgeCallback.setSafeToAck(false);
+ throw e;
+ }
+ return null;
+ }).process();
+ }
+
+ private void processMessage(ActiveMQMessage message, ReplicaEventType
eventType, TransactionId transactionId) throws Exception {
+ Object deserializedData =
eventSerializer.deserializeMessageData(message.getContent());
+ BigInteger newSequence = new
BigInteger(message.getStringProperty(ReplicaSupport.SEQUENCE_PROPERTY));
+
+ long sequenceDifference = sequence == null ? 0 :
newSequence.subtract(sequence).longValue();
+ MessageId messageId = message.getMessageId();
+ if (sequence == null || sequenceDifference == 1) {
+ processMessage(message, eventType, deserializedData,
transactionId);
+
+ sequence = newSequence;
+ sequenceMessageId = messageId;
+
+ } else if (sequenceDifference > 0) {
+ throw new IllegalStateException(String.format(
+ "Replication event is out of order. Current sequence: %s,
the sequence of the event: %s",
+ sequence, newSequence));
+ } else if (sequenceDifference < 0) {
+ logger.info("Replication message duplicate.");
+ } else if (!sequenceMessageId.equals(messageId)) {
+ throw new IllegalStateException(String.format(
+ "Replication event is out of order. Current sequence %s
belongs to message with id %s," +
+ "but the id of the event is %s", sequence,
sequenceMessageId, messageId));
+ }
+ }
+
+ private void processMessage(ActiveMQMessage message, ReplicaEventType
eventType, Object deserializedData,
+ TransactionId transactionId) throws Exception {
+ switch (eventType) {
+ case DESTINATION_UPSERT:
+ logger.trace("Processing replicated destination");
+ upsertDestination((ActiveMQDestination) deserializedData);
+ return;
+ case DESTINATION_DELETE:
+ logger.trace("Processing replicated destination deletion");
+ deleteDestination((ActiveMQDestination) deserializedData);
+ return;
+ case MESSAGE_SEND:
+ logger.trace("Processing replicated message send");
+ persistMessage((ActiveMQMessage) deserializedData,
transactionId);
+ return;
+ case MESSAGE_ACK:
+ logger.trace("Processing replicated messages dropped");
+ try {
+ messageAck((MessageAck) deserializedData,
+ (List<String>)
message.getObjectProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY), transactionId);
+ } catch (JMSException e) {
+ logger.error("Failed to extract property to replicate
messages dropped [{}]", deserializedData, e);
+ throw new Exception(e);
+ }
+ return;
+ case QUEUE_PURGED:
+ logger.trace("Processing queue purge");
+ purgeQueue((ActiveMQDestination) deserializedData);
+ return;
+ case TRANSACTION_BEGIN:
+ logger.trace("Processing replicated transaction begin");
+ beginTransaction((TransactionId) deserializedData);
+ return;
+ case TRANSACTION_PREPARE:
+ logger.trace("Processing replicated transaction prepare");
+ prepareTransaction((TransactionId) deserializedData);
+ return;
+ case TRANSACTION_FORGET:
+ logger.trace("Processing replicated transaction forget");
+ forgetTransaction((TransactionId) deserializedData);
+ return;
+ case TRANSACTION_ROLLBACK:
+ logger.trace("Processing replicated transaction rollback");
+ rollbackTransaction((TransactionId) deserializedData);
+ return;
+ case TRANSACTION_COMMIT:
+ logger.trace("Processing replicated transaction commit");
+ try {
+ commitTransaction(
+ (TransactionId) deserializedData,
+
message.getBooleanProperty(ReplicaSupport.TRANSACTION_ONE_PHASE_PROPERTY));
+ } catch (JMSException e) {
+ logger.error("Failed to extract property to replicate
transaction commit with id [{}]", deserializedData, e);
+ throw new Exception(e);
+ }
+ return;
+ case ADD_DURABLE_CONSUMER:
+ logger.trace("Processing replicated add consumer");
+ try {
+ addDurableConsumer((ConsumerInfo) deserializedData,
+
message.getStringProperty(ReplicaSupport.CLIENT_ID_PROPERTY));
+ } catch (JMSException e) {
+ logger.error("Failed to extract property to replicate add
consumer [{}]", deserializedData, e);
+ throw new Exception(e);
+ }
+ return;
+ case REMOVE_DURABLE_CONSUMER:
+ logger.trace("Processing replicated remove consumer");
+ removeDurableConsumer((ConsumerInfo) deserializedData);
+ return;
+ default:
+ throw new IllegalStateException(
+ String.format("Unhandled event type \"%s\" for
replication message id: %s",
+ eventType, message.getJMSMessageID()));
+ }
+ }
+
+ private void processBatch(ActiveMQMessage message, TransactionId tid)
throws Exception {
+ List<Object> objects =
eventSerializer.deserializeListOfObjects(message.getContent().getData());
+ for (Object o : objects) {
+ processMessageWithRetries((ActiveMQMessage) o, tid);
+ }
+ }
+
+ private void upsertDestination(ActiveMQDestination destination) throws
Exception {
+ try {
+ boolean isExistingDestination =
Arrays.stream(broker.getDestinations())
+ .anyMatch(d ->
d.getQualifiedName().equals(destination.getQualifiedName()));
+ if (isExistingDestination) {
+ logger.debug("Destination [{}] already exists, no action to
take", destination);
+ return;
+ }
+ } catch (Exception e) {
+ logger.error("Unable to determine if [{}] is an existing
destination", destination, e);
+ throw e;
+ }
+ try {
+ broker.addDestination(connectionContext, destination, true);
+ } catch (Exception e) {
+ logger.error("Unable to add destination [{}]", destination, e);
+ throw e;
+ }
+ }
+
+ private void deleteDestination(ActiveMQDestination destination) throws
Exception {
+ try {
+ boolean isNonExtantDestination =
Arrays.stream(broker.getDestinations())
Review Comment:
nit: typo
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ReplicaCompactor {
+ private static final Logger logger =
LoggerFactory.getLogger(ReplicaCompactor.class);
+ private static final String CONSUMER_SELECTOR = String.format("%s LIKE
'%s'", ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_ACK);
+ public static final int MAXIMUM_MESSAGES = 1_000;
+
+ private final Broker broker;
+ private final ConnectionContext connectionContext;
+ private final ReplicaReplicationQueueSupplier queueProvider;
+ private final PrefetchSubscription subscription;
+
+ private final Queue intermediateQueue;
+
+ public ReplicaCompactor(Broker broker, ConnectionContext
connectionContext, ReplicaReplicationQueueSupplier queueProvider,
PrefetchSubscription subscription) {
+ this.broker = broker;
+ this.connectionContext = connectionContext;
+ this.queueProvider = queueProvider;
+ this.subscription = subscription;
+
+ intermediateQueue =
broker.getDestinations(queueProvider.getIntermediateQueue()).stream().findFirst()
+ .map(DestinationExtractor::extractQueue).orElseThrow();
+ }
+
+ List<MessageReference> compactAndFilter(List<MessageReference> list,
boolean withAdditionalMessages) throws Exception {
+ List<DeliveredMessageReference> toProcess = list.stream()
+ .map(DeliveredMessageReference::new)
+ .collect(Collectors.toList());
+
+ int prefetchSize = subscription.getPrefetchSize();
+ try {
+ if (withAdditionalMessages) {
+ subscription.setPrefetchSize(0);
+ toProcess.addAll(getAdditionalMessages());
+ }
+
+ List<DeliveredMessageReference> processed =
compactAndFilter0(toProcess);
+
+ Set<MessageId> messageIds =
list.stream().map(MessageReference::getMessageId).collect(Collectors.toSet());
+
+ return processed.stream()
+ .map(dmr -> dmr.messageReference)
+ .filter(mr -> messageIds.contains(mr.getMessageId()))
+ .collect(Collectors.toList());
+ } finally {
+ subscription.setPrefetchSize(prefetchSize);
+ }
+ }
+
+ private List<DeliveredMessageReference> getAdditionalMessages() throws
Exception {
+ List<DeliveredMessageReference> result = new ArrayList<>();
+ List<QueueMessageReference> additionalMessages =
intermediateQueue.getMatchingMessages(connectionContext, CONSUMER_SELECTOR,
MAXIMUM_MESSAGES);
+ if (additionalMessages.isEmpty()) {
+ return result;
+ }
+
+ String selector = String.format("%s IN %s",
ReplicaSupport.MESSAGE_ID_PROPERTY, getAckedMessageIds(additionalMessages));
+
additionalMessages.addAll(intermediateQueue.getMatchingMessages(connectionContext,
selector, MAXIMUM_MESSAGES));
+
+ Set<MessageId> dispatchedMessageIds =
subscription.getDispatched().stream()
+ .map(MessageReference::getMessageId)
+ .collect(Collectors.toSet());
+
+ for (MessageReference messageReference : additionalMessages) {
+ if
(!dispatchedMessageIds.contains(messageReference.getMessageId())) {
+ result.add(new DeliveredMessageReference(messageReference,
false));
+ }
+ }
+
+ return result;
+ }
+
+ private List<DeliveredMessageReference>
compactAndFilter0(List<DeliveredMessageReference> list) throws Exception {
+ List<DeliveredMessageReference> result = new ArrayList<>(list);
+
+ List<Destination> destinations = combineByDestination(list);
+
+ List<DeliveredMessageId> toDelete = compact(destinations);
+
+ if (toDelete.isEmpty()) {
+ return result;
+ }
+
+ acknowledge(toDelete);
+
+ List<MessageId> messageIds = toDelete.stream().map(dmid ->
dmid.messageId).collect(Collectors.toList());
+ result.removeIf(reference ->
messageIds.contains(reference.messageReference.getMessageId()));
+
+ return result;
+ }
+
+ private void acknowledge(List<DeliveredMessageId> list) throws Exception {
+ TransactionId transactionId = new LocalTransactionId(
+ new
ConnectionId(ReplicaSupport.REPLICATION_PLUGIN_CONNECTION_ID),
+
ReplicaSupport.LOCAL_TRANSACTION_ID_GENERATOR.getNextSequenceId());
+
+ synchronized (ReplicaSupport.INTERMEDIATE_QUEUE_MUTEX) {
+ broker.beginTransaction(connectionContext, transactionId);
+
+ ConsumerBrokerExchange consumerExchange = new
ConsumerBrokerExchange();
+ consumerExchange.setConnectionContext(connectionContext);
+
+ for (DeliveredMessageId deliveredMessageId : list) {
+ if (!deliveredMessageId.delivered) {
+ messageDispatch(deliveredMessageId.messageId);
+ }
+
+ MessageAck messageAck = new MessageAck();
+ messageAck.setMessageID(deliveredMessageId.messageId);
+ messageAck.setMessageCount(1);
+ messageAck.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
+
messageAck.setDestination(queueProvider.getIntermediateQueue());
+
+ consumerExchange.setSubscription(subscription);
+
+ broker.acknowledge(consumerExchange, messageAck);
+ }
+
+ broker.commitTransaction(connectionContext, transactionId, true);
+ }
+ }
+
+ private List<Destination>
combineByDestination(List<DeliveredMessageReference> list) throws Exception {
+ Map<String, Destination> result = new HashMap<>();
+ for (DeliveredMessageReference reference : list) {
+ ActiveMQMessage message = (ActiveMQMessage)
reference.messageReference.getMessage();
+
+ ReplicaEventType eventType =
+
ReplicaEventType.valueOf(message.getStringProperty(ReplicaEventType.EVENT_TYPE_PROPERTY));
+ if (eventType != ReplicaEventType.MESSAGE_SEND && eventType !=
ReplicaEventType.MESSAGE_ACK) {
+ continue;
+ }
+
+ if
(!message.getBooleanProperty(ReplicaSupport.IS_ORIGINAL_MESSAGE_SENT_TO_QUEUE_PROPERTY)
+ ||
message.getBooleanProperty(ReplicaSupport.IS_ORIGINAL_MESSAGE_IN_XA_TRANSACTION_PROPERTY))
{
+ continue;
+ }
+
+ Destination destination =
+
result.computeIfAbsent(message.getStringProperty(ReplicaSupport.ORIGINAL_MESSAGE_DESTINATION_PROPERTY),
+ k -> new Destination());
+
+ if (eventType == ReplicaEventType.MESSAGE_SEND) {
+
destination.sendMap.put(message.getStringProperty(ReplicaSupport.MESSAGE_ID_PROPERTY),
+ new DeliveredMessageId(message.getMessageId(),
reference.delivered));
+ }
+ if (eventType == ReplicaEventType.MESSAGE_ACK) {
+ List<String> messageIds = getAckMessageIds(message);
+ destination.acks.add(new Ack(messageIds, message,
reference.delivered));
+ }
+ }
+
+ return new ArrayList<>(result.values());
+ }
+
+ private List<DeliveredMessageId> compact(List<Destination> destinations)
throws IOException {
+ List<DeliveredMessageId> result = new ArrayList<>();
+ for (Destination destination : destinations) {
+ for (Ack ack : destination.acks) {
+ List<String> sends = new ArrayList<>();
+ for (String id : ack.messageIdsToAck) {
+ if (destination.sendMap.containsKey(id)) {
+ sends.add(id);
+ result.add(destination.sendMap.get(id));
+ }
+ }
+ if (sends.size() == 0) {
+ continue;
+ }
+
+ if (ack.messageIdsToAck.size() == sends.size() && new
HashSet<>(ack.messageIdsToAck).containsAll(sends)) {
+ result.add(ack);
+ } else {
+ updateMessage(ack.message, ack.messageIdsToAck, sends);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ private void updateMessage(ActiveMQMessage message, List<String>
messageIdsToAck, List<String> sends) throws IOException {
+ message.setProperty(ReplicaSupport.ORIGINAL_MESSAGE_IDS_PROPERTY,
messageIdsToAck);
+ ArrayList<String> newList = new ArrayList<>(messageIdsToAck);
+ newList.removeAll(sends);
+ message.setProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY, newList);
+
+ synchronized (ReplicaSupport.INTERMEDIATE_QUEUE_MUTEX) {
+ intermediateQueue.getMessageStore().updateMessage(message);
+ }
+ }
+
+ private String getAckedMessageIds(List<QueueMessageReference> ackMessages)
throws IOException {
+ List<String> messageIds = new ArrayList<>();
+ for (QueueMessageReference messageReference : ackMessages) {
+ ActiveMQMessage message = (ActiveMQMessage)
messageReference.getMessage();
+
+ messageIds.addAll(getAckMessageIds(message));
+ }
+
+ return messageIds.stream().collect(Collectors.joining("','", "('",
"')"));
+ }
+
+ private void messageDispatch(MessageId messageId) throws Exception {
+ MessageDispatchNotification mdn = new MessageDispatchNotification();
+ mdn.setConsumerId(subscription.getConsumerInfo().getConsumerId());
+ mdn.setDestination(queueProvider.getIntermediateQueue());
+ mdn.setMessageId(messageId);
+ broker.processDispatchNotification(mdn);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static List<String> getAckMessageIds(ActiveMQMessage message)
throws IOException {
+ return (List<String>)
+
Optional.ofNullable(message.getProperty(ReplicaSupport.ORIGINAL_MESSAGE_IDS_PROPERTY))
+
.orElse(message.getProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY));
+ }
+
+ private static class DeliveredMessageReference {
+ final MessageReference messageReference;
+ final boolean delivered;
+
+ public DeliveredMessageReference(MessageReference messageReference) {
+ this(messageReference, true);
+ }
+
+ public DeliveredMessageReference(MessageReference messageReference,
boolean delivered) {
+ this.messageReference = messageReference;
+ this.delivered = delivered;
+ }
+ }
+
+ private static class Destination {
Review Comment:
This confused me in the code above since there are other Destination classes
like org.apache.activemq.broker.region.Destination and javax.jms.Destination
used throughout the code base. Maybe rename to something more descriptive of
what this is used for?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ReplicaCompactor {
+ private static final Logger logger =
LoggerFactory.getLogger(ReplicaCompactor.class);
+ private static final String CONSUMER_SELECTOR = String.format("%s LIKE
'%s'", ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_ACK);
+ public static final int MAXIMUM_MESSAGES = 1_000;
+
+ private final Broker broker;
+ private final ConnectionContext connectionContext;
+ private final ReplicaReplicationQueueSupplier queueProvider;
+ private final PrefetchSubscription subscription;
+
+ private final Queue intermediateQueue;
+
+ public ReplicaCompactor(Broker broker, ConnectionContext
connectionContext, ReplicaReplicationQueueSupplier queueProvider,
PrefetchSubscription subscription) {
+ this.broker = broker;
+ this.connectionContext = connectionContext;
+ this.queueProvider = queueProvider;
+ this.subscription = subscription;
+
+ intermediateQueue =
broker.getDestinations(queueProvider.getIntermediateQueue()).stream().findFirst()
+ .map(DestinationExtractor::extractQueue).orElseThrow();
+ }
+
+ List<MessageReference> compactAndFilter(List<MessageReference> list,
boolean withAdditionalMessages) throws Exception {
+ List<DeliveredMessageReference> toProcess = list.stream()
+ .map(DeliveredMessageReference::new)
+ .collect(Collectors.toList());
+
+ int prefetchSize = subscription.getPrefetchSize();
+ try {
+ if (withAdditionalMessages) {
+ subscription.setPrefetchSize(0);
+ toProcess.addAll(getAdditionalMessages());
+ }
+
+ List<DeliveredMessageReference> processed =
compactAndFilter0(toProcess);
+
+ Set<MessageId> messageIds =
list.stream().map(MessageReference::getMessageId).collect(Collectors.toSet());
+
+ return processed.stream()
+ .map(dmr -> dmr.messageReference)
+ .filter(mr -> messageIds.contains(mr.getMessageId()))
+ .collect(Collectors.toList());
+ } finally {
+ subscription.setPrefetchSize(prefetchSize);
+ }
+ }
+
+ private List<DeliveredMessageReference> getAdditionalMessages() throws
Exception {
+ List<DeliveredMessageReference> result = new ArrayList<>();
+ List<QueueMessageReference> additionalMessages =
intermediateQueue.getMatchingMessages(connectionContext, CONSUMER_SELECTOR,
MAXIMUM_MESSAGES);
+ if (additionalMessages.isEmpty()) {
+ return result;
+ }
+
+ String selector = String.format("%s IN %s",
ReplicaSupport.MESSAGE_ID_PROPERTY, getAckedMessageIds(additionalMessages));
+
additionalMessages.addAll(intermediateQueue.getMatchingMessages(connectionContext,
selector, MAXIMUM_MESSAGES));
Review Comment:
Using a selector here means you're doing a full scan of the queue every
time. That seems expensive!
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java:
##########
@@ -0,0 +1,482 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.MessageReferenceFilter;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.util.Objects.requireNonNull;
+
+public class ReplicaBrokerEventListener implements MessageListener {
+
+ private static final String REPLICATION_CONSUMER_CLIENT_ID =
"DUMMY_REPLICATION_CONSUMER";
+ private static final String SEQUENCE_NAME = "replicaSeq";
+ private final Logger logger =
LoggerFactory.getLogger(ReplicaBrokerEventListener.class);
+ private final ReplicaEventSerializer eventSerializer = new
ReplicaEventSerializer();
+ private final Broker broker;
+ private final ConnectionContext connectionContext;
+ private final ReplicaInternalMessageProducer
replicaInternalMessageProducer;
+
+ private final PeriodAcknowledge acknowledgeCallback;
+ final ReplicaSequenceStorage sequenceStorage;
+ BigInteger sequence;
+ MessageId sequenceMessageId;
+
+ ReplicaBrokerEventListener(Broker broker, ReplicaReplicationQueueSupplier
queueProvider, PeriodAcknowledge acknowledgeCallback) {
+ this.broker = requireNonNull(broker);
+ this.acknowledgeCallback = requireNonNull(acknowledgeCallback);
+ connectionContext = broker.getAdminConnectionContext().copy();
+
connectionContext.setUserName(ReplicaSupport.REPLICATION_PLUGIN_USER_NAME);
+ connectionContext.setClientId(REPLICATION_CONSUMER_CLIENT_ID);
+ connectionContext.setConnection(new DummyConnection());
+ replicaInternalMessageProducer = new
ReplicaInternalMessageProducer(broker, connectionContext);
+
+ createTransactionMapIfNotExist();
+
+ this.sequenceStorage = new ReplicaSequenceStorage(broker,
connectionContext,
+ queueProvider, replicaInternalMessageProducer, SEQUENCE_NAME);
+ }
+
+ public void initialize() throws Exception {
+ String savedSequence = sequenceStorage.initialize();
+ sequence = savedSequence == null ? null : new
BigInteger(savedSequence);
+ }
+
+ @Override
+ public void onMessage(Message jmsMessage) {
+ logger.trace("Received replication message from replica source");
+ ActiveMQMessage message = (ActiveMQMessage) jmsMessage;
+
+ processMessageWithRetries(message, null);
+ }
+
+ private synchronized void processMessageWithRetries(ActiveMQMessage
message, TransactionId transactionId) {
+ new ReplicaEventRetrier(() -> {
+ boolean commit = false;
+ TransactionId tid = transactionId;
+ if (tid == null) {
+ tid = new LocalTransactionId(
+ new
ConnectionId(ReplicaSupport.REPLICATION_PLUGIN_CONNECTION_ID),
+
ReplicaSupport.LOCAL_TRANSACTION_ID_GENERATOR.getNextSequenceId());
+
+ broker.beginTransaction(connectionContext, tid);
+
+ commit = true;
+ }
+
+ try {
+ ReplicaEventType eventType = getEventType(message);
+ if (eventType == ReplicaEventType.BATCH) {
+ processBatch(message, tid);
+ } else {
+ processMessage(message, eventType, tid);
+ }
+
+ if (commit) {
+ sequenceStorage.enqueue(tid, sequence.toString());
+
+ broker.commitTransaction(connectionContext, tid, true);
+ acknowledgeCallback.setSafeToAck(true);
+ }
+ } catch (Exception e) {
+ if (commit) {
+ broker.rollbackTransaction(connectionContext, tid);
+ }
+ acknowledgeCallback.setSafeToAck(false);
+ throw e;
+ }
+ return null;
+ }).process();
+ }
+
+ private void processMessage(ActiveMQMessage message, ReplicaEventType
eventType, TransactionId transactionId) throws Exception {
+ Object deserializedData =
eventSerializer.deserializeMessageData(message.getContent());
+ BigInteger newSequence = new
BigInteger(message.getStringProperty(ReplicaSupport.SEQUENCE_PROPERTY));
+
+ long sequenceDifference = sequence == null ? 0 :
newSequence.subtract(sequence).longValue();
+ MessageId messageId = message.getMessageId();
+ if (sequence == null || sequenceDifference == 1) {
+ processMessage(message, eventType, deserializedData,
transactionId);
+
+ sequence = newSequence;
+ sequenceMessageId = messageId;
+
+ } else if (sequenceDifference > 0) {
+ throw new IllegalStateException(String.format(
+ "Replication event is out of order. Current sequence: %s,
the sequence of the event: %s",
+ sequence, newSequence));
+ } else if (sequenceDifference < 0) {
+ logger.info("Replication message duplicate.");
+ } else if (!sequenceMessageId.equals(messageId)) {
+ throw new IllegalStateException(String.format(
+ "Replication event is out of order. Current sequence %s
belongs to message with id %s," +
+ "but the id of the event is %s", sequence,
sequenceMessageId, messageId));
+ }
+ }
+
+ private void processMessage(ActiveMQMessage message, ReplicaEventType
eventType, Object deserializedData,
+ TransactionId transactionId) throws Exception {
+ switch (eventType) {
+ case DESTINATION_UPSERT:
+ logger.trace("Processing replicated destination");
+ upsertDestination((ActiveMQDestination) deserializedData);
+ return;
+ case DESTINATION_DELETE:
+ logger.trace("Processing replicated destination deletion");
+ deleteDestination((ActiveMQDestination) deserializedData);
+ return;
+ case MESSAGE_SEND:
+ logger.trace("Processing replicated message send");
+ persistMessage((ActiveMQMessage) deserializedData,
transactionId);
+ return;
+ case MESSAGE_ACK:
+ logger.trace("Processing replicated messages dropped");
+ try {
+ messageAck((MessageAck) deserializedData,
+ (List<String>)
message.getObjectProperty(ReplicaSupport.MESSAGE_IDS_PROPERTY), transactionId);
+ } catch (JMSException e) {
+ logger.error("Failed to extract property to replicate
messages dropped [{}]", deserializedData, e);
+ throw new Exception(e);
+ }
+ return;
+ case QUEUE_PURGED:
+ logger.trace("Processing queue purge");
+ purgeQueue((ActiveMQDestination) deserializedData);
+ return;
+ case TRANSACTION_BEGIN:
+ logger.trace("Processing replicated transaction begin");
+ beginTransaction((TransactionId) deserializedData);
+ return;
+ case TRANSACTION_PREPARE:
+ logger.trace("Processing replicated transaction prepare");
+ prepareTransaction((TransactionId) deserializedData);
+ return;
+ case TRANSACTION_FORGET:
+ logger.trace("Processing replicated transaction forget");
+ forgetTransaction((TransactionId) deserializedData);
+ return;
+ case TRANSACTION_ROLLBACK:
+ logger.trace("Processing replicated transaction rollback");
+ rollbackTransaction((TransactionId) deserializedData);
+ return;
+ case TRANSACTION_COMMIT:
+ logger.trace("Processing replicated transaction commit");
+ try {
+ commitTransaction(
+ (TransactionId) deserializedData,
+
message.getBooleanProperty(ReplicaSupport.TRANSACTION_ONE_PHASE_PROPERTY));
+ } catch (JMSException e) {
+ logger.error("Failed to extract property to replicate
transaction commit with id [{}]", deserializedData, e);
+ throw new Exception(e);
+ }
+ return;
+ case ADD_DURABLE_CONSUMER:
+ logger.trace("Processing replicated add consumer");
+ try {
+ addDurableConsumer((ConsumerInfo) deserializedData,
+
message.getStringProperty(ReplicaSupport.CLIENT_ID_PROPERTY));
+ } catch (JMSException e) {
+ logger.error("Failed to extract property to replicate add
consumer [{}]", deserializedData, e);
+ throw new Exception(e);
+ }
+ return;
+ case REMOVE_DURABLE_CONSUMER:
+ logger.trace("Processing replicated remove consumer");
+ removeDurableConsumer((ConsumerInfo) deserializedData);
+ return;
+ default:
+ throw new IllegalStateException(
+ String.format("Unhandled event type \"%s\" for
replication message id: %s",
+ eventType, message.getJMSMessageID()));
+ }
+ }
+
+ private void processBatch(ActiveMQMessage message, TransactionId tid)
throws Exception {
+ List<Object> objects =
eventSerializer.deserializeListOfObjects(message.getContent().getData());
+ for (Object o : objects) {
+ processMessageWithRetries((ActiveMQMessage) o, tid);
+ }
+ }
+
+ private void upsertDestination(ActiveMQDestination destination) throws
Exception {
+ try {
+ boolean isExistingDestination =
Arrays.stream(broker.getDestinations())
+ .anyMatch(d ->
d.getQualifiedName().equals(destination.getQualifiedName()));
+ if (isExistingDestination) {
+ logger.debug("Destination [{}] already exists, no action to
take", destination);
+ return;
+ }
+ } catch (Exception e) {
+ logger.error("Unable to determine if [{}] is an existing
destination", destination, e);
+ throw e;
+ }
+ try {
+ broker.addDestination(connectionContext, destination, true);
+ } catch (Exception e) {
+ logger.error("Unable to add destination [{}]", destination, e);
+ throw e;
+ }
+ }
+
+ private void deleteDestination(ActiveMQDestination destination) throws
Exception {
+ try {
+ boolean isNonExtantDestination =
Arrays.stream(broker.getDestinations())
+ .noneMatch(d ->
d.getQualifiedName().equals(destination.getQualifiedName()));
+ if (isNonExtantDestination) {
+ logger.debug("Destination [{}] does not exist, no action to
take", destination);
+ return;
+ }
+ } catch (Exception e) {
+ logger.error("Unable to determine if [{}] is an existing
destination", destination, e);
+ throw e;
+ }
+ try {
+ broker.removeDestination(connectionContext, destination, 1000);
Review Comment:
What is the `1000` here?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSequenceStorage.java:
##########
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class ReplicaSequenceStorage {
+
+ private final Logger logger =
LoggerFactory.getLogger(ReplicaSequenceStorage.class);
+
+ static final String SEQUENCE_NAME_PROPERTY = "SequenceName";
+ private final LongSequenceGenerator eventMessageIdGenerator = new
LongSequenceGenerator();
+ private final ProducerId replicationProducerId = new ProducerId();
+ private final Broker broker;
+ private final ConnectionContext connectionContext;
+ private final ReplicaInternalMessageProducer
replicaInternalMessageProducer;
+ private final String sequenceName;
+ private final ReplicaReplicationQueueSupplier queueProvider;
+
+ private Queue sequenceQueue;
+ private PrefetchSubscription subscription;
+
+ public ReplicaSequenceStorage(Broker broker, ConnectionContext
connectionContext, ReplicaReplicationQueueSupplier queueProvider,
+ ReplicaInternalMessageProducer
replicaInternalMessageProducer, String sequenceName) {
+ this.broker = requireNonNull(broker);
+ this.connectionContext = connectionContext;
+ this.replicaInternalMessageProducer = replicaInternalMessageProducer;
+ this.sequenceName = requireNonNull(sequenceName);
+ this.queueProvider = queueProvider;
+
+ replicationProducerId.setConnectionId(new IdGenerator().generateId());
+ }
+
+ public String initialize() throws Exception {
+ sequenceQueue =
broker.getDestinations(queueProvider.getSequenceQueue()).stream().findFirst()
+ .map(DestinationExtractor::extractQueue).orElseThrow();
+
+ String selector = String.format("%s LIKE '%s'",
SEQUENCE_NAME_PROPERTY, sequenceName);
+
+ ConnectionId connectionId = new ConnectionId(new
IdGenerator("ReplicationPlugin.ReplicaSequenceStorage").generateId());
+ SessionId sessionId = new SessionId(connectionId, new
LongSequenceGenerator().getNextSequenceId());
+ ConsumerId consumerId = new ConsumerId(sessionId, new
LongSequenceGenerator().getNextSequenceId());
+ ConsumerInfo consumerInfo = new ConsumerInfo();
+ consumerInfo.setConsumerId(consumerId);
+ consumerInfo.setPrefetchSize(10);
+ consumerInfo.setDestination(queueProvider.getSequenceQueue());
+ consumerInfo.setSelector(selector);
+ subscription = (PrefetchSubscription)
broker.addConsumer(connectionContext, consumerInfo);
+
+ List<ActiveMQTextMessage> allMessages = new ArrayList<>();
+ for (MessageId messageId : sequenceQueue.getAllMessageIds()) {
+ ActiveMQTextMessage message = getMessageByMessageId(messageId);
+ if
(message.getStringProperty(SEQUENCE_NAME_PROPERTY).equals(sequenceName)) {
Review Comment:
You already have the selector checking this right?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSequenceStorage.java:
##########
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class ReplicaSequenceStorage {
+
+ private final Logger logger =
LoggerFactory.getLogger(ReplicaSequenceStorage.class);
+
+ static final String SEQUENCE_NAME_PROPERTY = "SequenceName";
+ private final LongSequenceGenerator eventMessageIdGenerator = new
LongSequenceGenerator();
+ private final ProducerId replicationProducerId = new ProducerId();
+ private final Broker broker;
+ private final ConnectionContext connectionContext;
+ private final ReplicaInternalMessageProducer
replicaInternalMessageProducer;
+ private final String sequenceName;
+ private final ReplicaReplicationQueueSupplier queueProvider;
+
+ private Queue sequenceQueue;
+ private PrefetchSubscription subscription;
+
+ public ReplicaSequenceStorage(Broker broker, ConnectionContext
connectionContext, ReplicaReplicationQueueSupplier queueProvider,
+ ReplicaInternalMessageProducer
replicaInternalMessageProducer, String sequenceName) {
+ this.broker = requireNonNull(broker);
+ this.connectionContext = connectionContext;
+ this.replicaInternalMessageProducer = replicaInternalMessageProducer;
+ this.sequenceName = requireNonNull(sequenceName);
+ this.queueProvider = queueProvider;
+
+ replicationProducerId.setConnectionId(new IdGenerator().generateId());
+ }
+
+ public String initialize() throws Exception {
+ sequenceQueue =
broker.getDestinations(queueProvider.getSequenceQueue()).stream().findFirst()
+ .map(DestinationExtractor::extractQueue).orElseThrow();
+
+ String selector = String.format("%s LIKE '%s'",
SEQUENCE_NAME_PROPERTY, sequenceName);
+
+ ConnectionId connectionId = new ConnectionId(new
IdGenerator("ReplicationPlugin.ReplicaSequenceStorage").generateId());
+ SessionId sessionId = new SessionId(connectionId, new
LongSequenceGenerator().getNextSequenceId());
+ ConsumerId consumerId = new ConsumerId(sessionId, new
LongSequenceGenerator().getNextSequenceId());
+ ConsumerInfo consumerInfo = new ConsumerInfo();
+ consumerInfo.setConsumerId(consumerId);
+ consumerInfo.setPrefetchSize(10);
+ consumerInfo.setDestination(queueProvider.getSequenceQueue());
+ consumerInfo.setSelector(selector);
+ subscription = (PrefetchSubscription)
broker.addConsumer(connectionContext, consumerInfo);
+
+ List<ActiveMQTextMessage> allMessages = new ArrayList<>();
+ for (MessageId messageId : sequenceQueue.getAllMessageIds()) {
+ ActiveMQTextMessage message = getMessageByMessageId(messageId);
+ if
(message.getStringProperty(SEQUENCE_NAME_PROPERTY).equals(sequenceName)) {
+ allMessages.add(message);
+ }
+ }
+
+ if (allMessages.size() == 0) {
+ return null;
+ }
+
+ if (allMessages.size() > 1) {
+ for (int i = 0; i < allMessages.size() - 1; i++) {
+
sequenceQueue.removeMessage(allMessages.get(i).getMessageId().toString());
+ }
+ }
+
+ return allMessages.get(0).getText();
+ }
+
+ public void enqueue(TransactionId tid, String message) throws Exception {
+ // before enqueue message, we acknowledge all messages currently in
queue.
+ acknowledgeAll(tid);
+
+ send(tid, message);
+ }
+
+ private void acknowledgeAll(TransactionId tid) throws Exception {
+ List<MessageReference> dispatched = subscription.getDispatched();
+ ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
+ consumerExchange.setConnectionContext(connectionContext);
+ consumerExchange.setSubscription(subscription);
+
+ for(MessageReference messageReference: dispatched) {
Review Comment:
Is it any cheaper to send 1 ack for the whole range?
ack.setFirstMessage(...)
ack.setLastMessage(...)
ack.setMessageCount(...)
##########
activemq-broker/src/main/java/org/apache/activemq/replica/PeriodAcknowledge.java:
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PeriodAcknowledge {
+
+ private static final int MAX_ACK_BATCH_SIZE = 100;
+ private boolean safeToAck = true;
+ private final AtomicLong lastAckTime = new AtomicLong();
+ private final AtomicInteger pendingAckCount = new AtomicInteger();
+ private final AtomicReference<ActiveMQConnection> connection = new
AtomicReference<>();
+ private final AtomicReference<ActiveMQSession> connectionSession = new
AtomicReference<>();
+ private final long replicaAckPeriod;
+ private final Object periodicCommitLock = new Object();
+
+
+ public PeriodAcknowledge(long replicaAckPeriod) {
+ this.replicaAckPeriod = replicaAckPeriod;
+ }
+
+ public void setConnection(ActiveMQConnection activeMQConnection) {
+ connection.set(activeMQConnection);
+ }
+
+ public void setConnectionSession(ActiveMQSession activeMQSession) {
+ connectionSession.set(activeMQSession);
+ }
+
+ public void setSafeToAck(boolean safeToAck) {
+ this.safeToAck = safeToAck;
+ }
+
+ private boolean shouldPeriodicallyCommit() {
+ return System.currentTimeMillis() - lastAckTime.get() >=
replicaAckPeriod;
+ }
+
+ private boolean reachedMaxAckBatchSize() {
+ return pendingAckCount.incrementAndGet() >= MAX_ACK_BATCH_SIZE;
+ }
+
+ public void acknowledge() throws Exception {
+ if (connection.get() == null || connectionSession.get() == null ||
!safeToAck) {
+ return;
+ }
+
+ synchronized (periodicCommitLock) {
Review Comment:
I think there's less contention if you only lock when
`reachedMaxAckBatchSize() || shouldPeriodicallyCommit()`. You'd have to double
check once you get the lock though I guess 🤔
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaAckHelper.java:
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ReplicaAckHelper {
+
+ private final Broker broker;
+
+ public ReplicaAckHelper(Broker broker) {
+ this.broker = broker;
+ }
+
+ public List<MessageReference> getMessagesToAck(MessageAck ack, Destination
destination) {
+ PrefetchSubscription prefetchSubscription =
getPrefetchSubscription(destination, ack.getConsumerId());
+ if (prefetchSubscription == null) {
+ return null;
Review Comment:
Can we return an Optional here?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ReplicaCompactor {
+ private static final Logger logger =
LoggerFactory.getLogger(ReplicaCompactor.class);
+ private static final String CONSUMER_SELECTOR = String.format("%s LIKE
'%s'", ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_ACK);
+ public static final int MAXIMUM_MESSAGES = 1_000;
+
+ private final Broker broker;
+ private final ConnectionContext connectionContext;
+ private final ReplicaReplicationQueueSupplier queueProvider;
+ private final PrefetchSubscription subscription;
+
+ private final Queue intermediateQueue;
+
+ public ReplicaCompactor(Broker broker, ConnectionContext
connectionContext, ReplicaReplicationQueueSupplier queueProvider,
PrefetchSubscription subscription) {
+ this.broker = broker;
+ this.connectionContext = connectionContext;
+ this.queueProvider = queueProvider;
+ this.subscription = subscription;
+
+ intermediateQueue =
broker.getDestinations(queueProvider.getIntermediateQueue()).stream().findFirst()
+ .map(DestinationExtractor::extractQueue).orElseThrow();
+ }
+
+ List<MessageReference> compactAndFilter(List<MessageReference> list,
boolean withAdditionalMessages) throws Exception {
+ List<DeliveredMessageReference> toProcess = list.stream()
+ .map(DeliveredMessageReference::new)
+ .collect(Collectors.toList());
+
+ int prefetchSize = subscription.getPrefetchSize();
+ try {
+ if (withAdditionalMessages) {
+ subscription.setPrefetchSize(0);
+ toProcess.addAll(getAdditionalMessages());
+ }
+
+ List<DeliveredMessageReference> processed =
compactAndFilter0(toProcess);
+
+ Set<MessageId> messageIds =
list.stream().map(MessageReference::getMessageId).collect(Collectors.toSet());
+
+ return processed.stream()
+ .map(dmr -> dmr.messageReference)
+ .filter(mr -> messageIds.contains(mr.getMessageId()))
+ .collect(Collectors.toList());
+ } finally {
+ subscription.setPrefetchSize(prefetchSize);
+ }
+ }
+
+ private List<DeliveredMessageReference> getAdditionalMessages() throws
Exception {
+ List<DeliveredMessageReference> result = new ArrayList<>();
+ List<QueueMessageReference> additionalMessages =
intermediateQueue.getMatchingMessages(connectionContext, CONSUMER_SELECTOR,
MAXIMUM_MESSAGES);
Review Comment:
Using a selector here means you're doing a full scan of the queue every
time. That seems expensive!
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ReplicaCompactor {
+ private static final Logger logger =
LoggerFactory.getLogger(ReplicaCompactor.class);
+ private static final String CONSUMER_SELECTOR = String.format("%s LIKE
'%s'", ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_ACK);
+ public static final int MAXIMUM_MESSAGES = 1_000;
+
+ private final Broker broker;
+ private final ConnectionContext connectionContext;
+ private final ReplicaReplicationQueueSupplier queueProvider;
+ private final PrefetchSubscription subscription;
+
+ private final Queue intermediateQueue;
+
+ public ReplicaCompactor(Broker broker, ConnectionContext
connectionContext, ReplicaReplicationQueueSupplier queueProvider,
PrefetchSubscription subscription) {
+ this.broker = broker;
+ this.connectionContext = connectionContext;
+ this.queueProvider = queueProvider;
+ this.subscription = subscription;
+
+ intermediateQueue =
broker.getDestinations(queueProvider.getIntermediateQueue()).stream().findFirst()
+ .map(DestinationExtractor::extractQueue).orElseThrow();
+ }
+
+ List<MessageReference> compactAndFilter(List<MessageReference> list,
boolean withAdditionalMessages) throws Exception {
+ List<DeliveredMessageReference> toProcess = list.stream()
+ .map(DeliveredMessageReference::new)
+ .collect(Collectors.toList());
+
+ int prefetchSize = subscription.getPrefetchSize();
+ try {
+ if (withAdditionalMessages) {
+ subscription.setPrefetchSize(0);
+ toProcess.addAll(getAdditionalMessages());
+ }
+
+ List<DeliveredMessageReference> processed =
compactAndFilter0(toProcess);
+
+ Set<MessageId> messageIds =
list.stream().map(MessageReference::getMessageId).collect(Collectors.toSet());
+
+ return processed.stream()
+ .map(dmr -> dmr.messageReference)
+ .filter(mr -> messageIds.contains(mr.getMessageId()))
+ .collect(Collectors.toList());
+ } finally {
+ subscription.setPrefetchSize(prefetchSize);
+ }
+ }
+
+ private List<DeliveredMessageReference> getAdditionalMessages() throws
Exception {
+ List<DeliveredMessageReference> result = new ArrayList<>();
+ List<QueueMessageReference> additionalMessages =
intermediateQueue.getMatchingMessages(connectionContext, CONSUMER_SELECTOR,
MAXIMUM_MESSAGES);
+ if (additionalMessages.isEmpty()) {
+ return result;
+ }
+
+ String selector = String.format("%s IN %s",
ReplicaSupport.MESSAGE_ID_PROPERTY, getAckedMessageIds(additionalMessages));
+
additionalMessages.addAll(intermediateQueue.getMatchingMessages(connectionContext,
selector, MAXIMUM_MESSAGES));
+
+ Set<MessageId> dispatchedMessageIds =
subscription.getDispatched().stream()
+ .map(MessageReference::getMessageId)
+ .collect(Collectors.toSet());
+
+ for (MessageReference messageReference : additionalMessages) {
+ if
(!dispatchedMessageIds.contains(messageReference.getMessageId())) {
+ result.add(new DeliveredMessageReference(messageReference,
false));
+ }
+ }
+
+ return result;
+ }
+
+ private List<DeliveredMessageReference>
compactAndFilter0(List<DeliveredMessageReference> list) throws Exception {
+ List<DeliveredMessageReference> result = new ArrayList<>(list);
+
+ List<Destination> destinations = combineByDestination(list);
+
+ List<DeliveredMessageId> toDelete = compact(destinations);
+
+ if (toDelete.isEmpty()) {
+ return result;
+ }
+
+ acknowledge(toDelete);
+
+ List<MessageId> messageIds = toDelete.stream().map(dmid ->
dmid.messageId).collect(Collectors.toList());
Review Comment:
Use a set?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBroker.java:
##########
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageDispatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import java.lang.reflect.Method;
+import java.text.MessageFormat;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Objects.requireNonNull;
+
+public class ReplicaBroker extends BrokerFilter {
+
+ private final static long REPLICA_ACK_PERIOD = 5_000;
+
+ private final Logger logger = LoggerFactory.getLogger(ReplicaBroker.class);
+ private final ScheduledExecutorService brokerConnectionPoller =
Executors.newSingleThreadScheduledExecutor();
+ private final ScheduledExecutorService periodicAckPoller =
Executors.newSingleThreadScheduledExecutor();
+ private final AtomicBoolean isConnecting = new AtomicBoolean();
+ private final AtomicReference<ActiveMQConnection> connection = new
AtomicReference<>();
+ private final AtomicReference<ActiveMQSession> connectionSession = new
AtomicReference<>();
+ private final AtomicReference<ActiveMQMessageConsumer> eventConsumer = new
AtomicReference<>();
+ private final ReplicaReplicationQueueSupplier queueProvider;
+ private final ActiveMQConnectionFactory replicaSourceConnectionFactory;
+ private final PeriodAcknowledge periodAcknowledgeCallBack;
+
+ public ReplicaBroker(Broker next, ReplicaReplicationQueueSupplier
queueProvider, ActiveMQConnectionFactory replicaSourceConnectionFactory) {
+ super(next);
+ this.queueProvider = queueProvider;
+ this.periodAcknowledgeCallBack = new
PeriodAcknowledge(REPLICA_ACK_PERIOD);
+ this.replicaSourceConnectionFactory =
requireNonNull(replicaSourceConnectionFactory, "Need connection details of
replica source for this broker");
+ requireNonNull(replicaSourceConnectionFactory.getBrokerURL(), "Need
connection URI of replica source for this broker");
+ validateUser(replicaSourceConnectionFactory);
+ }
+
+ private void validateUser(ActiveMQConnectionFactory
replicaSourceConnectionFactory) {
+ if (replicaSourceConnectionFactory.getUserName() != null) {
+ requireNonNull(replicaSourceConnectionFactory.getPassword(), "Both
userName and password or none of them should be configured for replica broker");
+ }
+ if (replicaSourceConnectionFactory.getPassword() != null) {
+ requireNonNull(replicaSourceConnectionFactory.getUserName(), "Both
userName and password or none of them should be configured for replica broker");
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ super.start();
+ queueProvider.initializeSequenceQueue();
+
brokerConnectionPoller.scheduleAtFixedRate(this::beginReplicationIdempotent, 5,
5, TimeUnit.SECONDS);
Review Comment:
Is 5 seconds meaningful here? Can you use `REPLICA_ACK_PERIOD`?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaCompactor.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ReplicaCompactor {
+ private static final Logger logger =
LoggerFactory.getLogger(ReplicaCompactor.class);
+ private static final String CONSUMER_SELECTOR = String.format("%s LIKE
'%s'", ReplicaEventType.EVENT_TYPE_PROPERTY, ReplicaEventType.MESSAGE_ACK);
+ public static final int MAXIMUM_MESSAGES = 1_000;
Review Comment:
Configurable?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBatcher.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+
+import javax.jms.JMSException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ReplicaBatcher {
+
+ static final int MAX_BATCH_LENGTH = 500;
+ static final int MAX_BATCH_SIZE = 5_000_000; // 5 Mb
+
+ @SuppressWarnings("unchecked")
+ static List<List<MessageReference>> batches(List<MessageReference> list)
throws Exception {
+ List<List<MessageReference>> result = new ArrayList<>();
+
+ Map<String, Set<String>> destination2eventType = new HashMap<>();
Review Comment:
destinationToeventType?
##########
activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaRole.java:
##########
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+public enum ReplicaRole {
+ source, replica, dual
Review Comment:
Is replica used? It looks like only source and dual are supported in
`ReplicaPlugin`
##########
activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaCompactorTest.java:
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ReplicaCompactorTest {
+
+ private final ConnectionContext connectionContext =
mock(ConnectionContext.class);
+ private final Broker broker = mock(Broker.class);
+ private final ReplicaReplicationQueueSupplier queueProvider =
mock(ReplicaReplicationQueueSupplier.class);
+ private final MessageStore messageStore = mock(MessageStore.class);
+
+ private final ActiveMQQueue intermediateQueueDestination = new
ActiveMQQueue(ReplicaSupport.INTERMEDIATE_REPLICATION_QUEUE_NAME);
+ private final Queue intermediateQueue = mock(Queue.class);
+
+ private ReplicaCompactor replicaCompactor;
+
+ @Before
+ public void setUp() throws Exception {
+ ConnectionContext adminConnectionContext =
mock(ConnectionContext.class);
+ when(adminConnectionContext.copy()).thenReturn(connectionContext);
+
when(broker.getAdminConnectionContext()).thenReturn(adminConnectionContext);
+
+
when(queueProvider.getIntermediateQueue()).thenReturn(intermediateQueueDestination);
+
when(broker.getDestinations(intermediateQueueDestination)).thenReturn(Set.of(intermediateQueue));
+ when(intermediateQueue.getMessageStore()).thenReturn(messageStore);
+
+ ConsumerInfo consumerInfo = new ConsumerInfo();
+ PrefetchSubscription originalSubscription =
mock(PrefetchSubscription.class);
+ when(originalSubscription.getConsumerInfo()).thenReturn(consumerInfo);
+
+ replicaCompactor = new ReplicaCompactor(broker, connectionContext,
queueProvider, originalSubscription);
+ }
+
+ @Test
+ public void compactWhenSendAndAck() throws Exception {
+ MessageId messageId1 = new MessageId("1:0:0:1");
+ MessageId messageId2 = new MessageId("1:0:0:2");
+ MessageId messageId3 = new MessageId("1:0:0:3");
+
+ String messageIdToAck = "2:1";
+
+ ActiveMQMessage message1 = new ActiveMQMessage();
+ message1.setMessageId(messageId1);
+
message1.setBooleanProperty(ReplicaSupport.IS_ORIGINAL_MESSAGE_SENT_TO_QUEUE_PROPERTY,
true);
+ message1.setStringProperty(ReplicaEventType.EVENT_TYPE_PROPERTY,
ReplicaEventType.MESSAGE_SEND.toString());
+ message1.setStringProperty(ReplicaSupport.MESSAGE_ID_PROPERTY,
messageIdToAck);
Review Comment:
Why is there a seperate ReplicaSupport.MESSAGE_ID_PROPERTY? Why not just use
the message id you're already setting?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]