gemmellr commented on code in PR #5641: URL: https://github.com/apache/activemq-artemis/pull/5641#discussion_r2056521548
########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeSenderInfo.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.connect.bridge; + +import java.util.Objects; +import java.util.UUID; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.utils.CompositeAddress; + +/** + * Information and identification interface for AMQP bridge senders that will be + * created on the remote peer as demand on the local broker is detected. The behavior + * and meaning of some APIs in this interface may vary slightly depending on the role + * of the sender (Address or Queue). + */ +public class AMQPBridgeSenderInfo { + + enum Role { + /** + * Sender created from a match on a configured bridge to address policy. + */ + ADDRESS_SENDER, + + /** + * Sender created from a match on a configured bridge to queue policy. + */ + QUEUE_SENDER + } + + private final Role role; + private final String localAddress; + private final String localQueue; + private final String localFqqn; + private final String remoteAddress; + private final RoutingType routingType; + private final String id; + + public AMQPBridgeSenderInfo(Role role, String localAddress, String localQueue, RoutingType routingType, String remoteAddress) { + this.role = role; + this.localAddress = localAddress; + this.localQueue = localQueue; + if (role == Role.QUEUE_SENDER) { + localFqqn = CompositeAddress.toFullyQualified(localAddress, localQueue).toString(); + } else { + localFqqn = null; + } + this.routingType = routingType; + this.remoteAddress = remoteAddress; + this.id = UUID.randomUUID().toString(); + } + + /** + * {@return a unique Id for the sender being represented.} Review Comment: Another '.' to remove here. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeSender.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.connect.bridge; + +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DETACH_FORCED; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NOT_FOUND; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.RESOURCE_DELETED; + +import java.io.Closeable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeMetrics.SenderMetrics; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; +import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; +import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; +import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.Detach; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Sender; + +/** + * Base implementation for AMQP Bridge sender implementations + */ +public abstract class AMQPBridgeSender implements Closeable { + + // Sequence ID value used to keep links that would otherwise have the same name from overlapping + // this generally occurs when a remote link detach is delayed and new resources are added before it + // arrives resulting in an unintended link stealing scenario in the proton engine. + protected static final AtomicLong LINK_SEQUENCE_ID = new AtomicLong(); + + protected final AMQPBridgeManager bridgeManager; + protected final AMQPBridgeToPolicyManager policyManager; + protected final AMQPBridgeSenderConfiguration configuration; + protected final AMQPBridgeSenderInfo senderInfo; + protected final AMQPBridgePolicy policy; + protected final AMQPConnectionContext connection; + protected final AMQPSessionContext session; + protected final SenderMetrics metrics; + protected final AtomicBoolean closed = new AtomicBoolean(); + + protected ProtonServerSenderContext senderContext; + protected Sender protonSender; + protected volatile boolean initialized; + protected Consumer<AMQPBridgeSender> remoteOpenHandler; + protected Consumer<AMQPBridgeSender> remoteCloseHandler; + + public AMQPBridgeSender(AMQPBridgeToPolicyManager policyManager, + AMQPBridgeSenderConfiguration configuration, + AMQPSessionContext session, + AMQPBridgeSenderInfo senderInfo, + SenderMetrics metrics) { + this.policyManager = policyManager; + this.bridgeManager = policyManager.getBridgeManager(); + this.senderInfo = senderInfo; + this.policy = policyManager.getPolicy(); + this.connection = session.getAMQPConnectionContext(); + this.session = session; + this.configuration = configuration; + this.metrics = metrics; + } + + public boolean isClosed() { + return closed.get(); + } + + /** + * {@return <code>true</code> if the receiver has previously been initialized.} Review Comment: Another '.' to remove here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact