[ https://issues.apache.org/jira/browse/ARTEMIS-4366?focusedWorklogId=871512&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-871512 ]
ASF GitHub Bot logged work on ARTEMIS-4366: ------------------------------------------- Author: ASF GitHub Bot Created on: 18/Jul/23 09:48 Start Date: 18/Jul/23 09:48 Worklog Time Spent: 10m Work Description: gemmellr commented on code in PR #4555: URL: https://github.com/apache/activemq-artemis/pull/4555#discussion_r1266421706 ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java: ########## @@ -75,7 +75,7 @@ public static String getMirrorAddress(String connectionName) { // We must use one referenceIDSupplier per server. // protocol manager is the perfect aggregation for that. - private ReferenceNodeStore referenceIDSupplier; + private ReferenceNodeStoreFactory referenceIDSupplier; Review Comment: Change makes the comment above seem inaccurate / contradictory (somewhat similar with the field name). There is now a factory being returned, and so the overall change here seems like it was to make sure there **is not** a single reference ID supplier per server as the comment says, but rather now one per-queue. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.connect.mirror; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.utils.collections.NodeStore; +import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; + +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; + +public class ReferenceNodeStoreFactory implements NodeStoreFactory<MessageReference> { + + final ActiveMQServer server; + + private final String serverID; + + public ReferenceNodeStoreFactory(ActiveMQServer server) { + this.server = server; + this.serverID = server.getNodeID().toString(); + Review Comment: Superfluous newline ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java: ########## @@ -125,11 +125,11 @@ public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServe routingHandler = new AMQPRoutingHandler(server); } - public synchronized ReferenceNodeStore getReferenceIDSupplier() { + public synchronized ReferenceNodeStoreFactory getReferenceIDSupplier() { Review Comment: The method name also seems off now. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.connect.mirror; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.utils.collections.NodeStore; +import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; + +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; + +public class ReferenceNodeStoreFactory implements NodeStoreFactory<MessageReference> { + + final ActiveMQServer server; + + private final String serverID; + + public ReferenceNodeStoreFactory(ActiveMQServer server) { + this.server = server; + this.serverID = server.getNodeID().toString(); + + } + + @Override + public NodeStore<MessageReference> newNodeStore() { + return new ReferenceNodeStore(this); + } + + public String getDefaultNodeID() { + return serverID; + } + + public String getServerID(MessageReference element) { + return getServerID(element.getMessage()); + } + + Review Comment: Superfluous newline ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.connect.mirror; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.utils.collections.NodeStore; +import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; + +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; + +public class ReferenceNodeStoreFactory implements NodeStoreFactory<MessageReference> { + + final ActiveMQServer server; + + private final String serverID; + + public ReferenceNodeStoreFactory(ActiveMQServer server) { + this.server = server; + this.serverID = server.getNodeID().toString(); + + } + + @Override + public NodeStore<MessageReference> newNodeStore() { + return new ReferenceNodeStore(this); + } + + public String getDefaultNodeID() { + return serverID; + } + + public String getServerID(MessageReference element) { + return getServerID(element.getMessage()); + } + + + public String getServerID(Message message) { + Object nodeID = message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY); + if (nodeID != null) { + return nodeID.toString(); + } else { + // it is important to return null here, as the MirrorSource is expecting it to be null + // in the case the nodeID being from the originating server. + // don't be tempted to return this.serverID here. + return null; + } + } + + public long getID(MessageReference element) { + Message message = element.getMessage(); + Long id = getID(message); + if (id == null) { + return element.getMessageID(); + } else { + return id; + } + } + + private Long getID(Message message) { + return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY); + } + + Review Comment: Superfluous newline ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java: ########## @@ -20,20 +20,16 @@ import io.netty.util.collection.LongObjectHashMap; import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListImpl; -import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY; -import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; - public class ReferenceNodeStore implements NodeStore<MessageReference> { - private final String serverID; + private final ReferenceNodeStoreFactory factory; Review Comment: It seems especially weird for a ReferenceNodeStore to contain the ReferenceNodeStoreFactory that creates ReferenceNodeStore. One which it then only uses to get IDs (which are then really just taken from the message [reference]). Feels like most or all this usage should be a different interface/ object. Issue Time Tracking ------------------- Worklog Id: (was: 871512) Time Spent: 0.5h (was: 20m) > Addresses with multiple subscriptions are not working with Mirroring > -------------------------------------------------------------------- > > Key: ARTEMIS-4366 > URL: https://issues.apache.org/jira/browse/ARTEMIS-4366 > Project: ActiveMQ Artemis > Issue Type: Bug > Reporter: Clebert Suconic > Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > The NodeStore will be reused by the two Queues on the same address. Each > queue should have its own. -- This message was sent by Atlassian Jira (v8.20.10#820010)