[
https://issues.apache.org/jira/browse/ARTEMIS-5437?focusedWorklogId=967313&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-967313
]
ASF GitHub Bot logged work on ARTEMIS-5437:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 23/Apr/25 17:11
Start Date: 23/Apr/25 17:11
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #5641:
URL: https://github.com/apache/activemq-artemis/pull/5641#discussion_r2056521548
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeSenderInfo.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.connect.bridge;
+
+import java.util.Objects;
+import java.util.UUID;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.utils.CompositeAddress;
+
+/**
+ * Information and identification interface for AMQP bridge senders that will
be
+ * created on the remote peer as demand on the local broker is detected. The
behavior
+ * and meaning of some APIs in this interface may vary slightly depending on
the role
+ * of the sender (Address or Queue).
+ */
+public class AMQPBridgeSenderInfo {
+
+ enum Role {
+ /**
+ * Sender created from a match on a configured bridge to address policy.
+ */
+ ADDRESS_SENDER,
+
+ /**
+ * Sender created from a match on a configured bridge to queue policy.
+ */
+ QUEUE_SENDER
+ }
+
+ private final Role role;
+ private final String localAddress;
+ private final String localQueue;
+ private final String localFqqn;
+ private final String remoteAddress;
+ private final RoutingType routingType;
+ private final String id;
+
+ public AMQPBridgeSenderInfo(Role role, String localAddress, String
localQueue, RoutingType routingType, String remoteAddress) {
+ this.role = role;
+ this.localAddress = localAddress;
+ this.localQueue = localQueue;
+ if (role == Role.QUEUE_SENDER) {
+ localFqqn = CompositeAddress.toFullyQualified(localAddress,
localQueue).toString();
+ } else {
+ localFqqn = null;
+ }
+ this.routingType = routingType;
+ this.remoteAddress = remoteAddress;
+ this.id = UUID.randomUUID().toString();
+ }
+
+ /**
+ * {@return a unique Id for the sender being represented.}
Review Comment:
Another '.' to remove here.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeSender.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.bridge;
+
+import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DETACH_FORCED;
+import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NOT_FOUND;
+import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.RESOURCE_DELETED;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeMetrics.SenderMetrics;
+import
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
+import
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.Detach;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * Base implementation for AMQP Bridge sender implementations
+ */
+public abstract class AMQPBridgeSender implements Closeable {
+
+ // Sequence ID value used to keep links that would otherwise have the same
name from overlapping
+ // this generally occurs when a remote link detach is delayed and new
resources are added before it
+ // arrives resulting in an unintended link stealing scenario in the proton
engine.
+ protected static final AtomicLong LINK_SEQUENCE_ID = new AtomicLong();
+
+ protected final AMQPBridgeManager bridgeManager;
+ protected final AMQPBridgeToPolicyManager policyManager;
+ protected final AMQPBridgeSenderConfiguration configuration;
+ protected final AMQPBridgeSenderInfo senderInfo;
+ protected final AMQPBridgePolicy policy;
+ protected final AMQPConnectionContext connection;
+ protected final AMQPSessionContext session;
+ protected final SenderMetrics metrics;
+ protected final AtomicBoolean closed = new AtomicBoolean();
+
+ protected ProtonServerSenderContext senderContext;
+ protected Sender protonSender;
+ protected volatile boolean initialized;
+ protected Consumer<AMQPBridgeSender> remoteOpenHandler;
+ protected Consumer<AMQPBridgeSender> remoteCloseHandler;
+
+ public AMQPBridgeSender(AMQPBridgeToPolicyManager policyManager,
+ AMQPBridgeSenderConfiguration configuration,
+ AMQPSessionContext session,
+ AMQPBridgeSenderInfo senderInfo,
+ SenderMetrics metrics) {
+ this.policyManager = policyManager;
+ this.bridgeManager = policyManager.getBridgeManager();
+ this.senderInfo = senderInfo;
+ this.policy = policyManager.getPolicy();
+ this.connection = session.getAMQPConnectionContext();
+ this.session = session;
+ this.configuration = configuration;
+ this.metrics = metrics;
+ }
+
+ public boolean isClosed() {
+ return closed.get();
+ }
+
+ /**
+ * {@return <code>true</code> if the receiver has previously been
initialized.}
Review Comment:
Another '.' to remove here.
Issue Time Tracking
-------------------
Worklog Id: (was: 967313)
Time Spent: 40m (was: 0.5h)
> Add a more advanced message bridging feature to AMQP broker connections
> -----------------------------------------------------------------------
>
> Key: ARTEMIS-5437
> URL: https://issues.apache.org/jira/browse/ARTEMIS-5437
> Project: ActiveMQ Artemis
> Issue Type: New Feature
> Components: AMQP
> Affects Versions: 2.40.0
> Reporter: Timothy A. Bish
> Assignee: Timothy A. Bish
> Priority: Major
> Labels: pull-request-available
> Time Spent: 40m
> Remaining Estimate: 0h
>
> The AMQP broker connection currently offers a somewhat primitive bridge
> capability by offering configuration of SENDER and RECEIVER elements which
> can perform some basic bridging primarily between Artemis instances given the
> way the send to address, receiver from queue behavior is carried over into
> the implementation.
> This new AMQP bridge feature offers a more advanced and flexible bridge
> capability that allows for messages to be bridged from or to a remote with
> various configuration options to account for the remote not being an Artemis
> peer but leveraging some Artemis capabilities like Core message tunneling
> over AMQP when the remote is an Artemis instance. By default the receivers
> for this bridge implementation activate only on local demand being present
> but can be configured to be always active as the older RECEIVER capability
> currently behaves.
> This implementation also incorporates features implemented in the AMQP
> federation bits that deal with draining link credit on receivers before
> detaching to avoid potential duplicates and idling links if demand tracking
> is active (default mode) to avoid rapid create and destroy cycles for bridge
> receiver links. There is also in-built link recovery mechanisms that will
> attempt to recreate links that are closed by the remote on a periodic basis.
> Also like federation Queue receivers the new bridge receivers can be
> configured in pull mode to only offer credit when the local Queue has no
> pending backlog to avoid moving messages until there is a need.
> A example of configuration for an AMQP broker connection bridge is shown below
> {code:xml}
> <amqp-connection uri="tcp://host:port"
> name="my-bridge-configuration">
> <bridge>
> <bridge-from-queue name="policy-name-1">
> <include address-match="#" queue-match="someQueue" />
> <property key="amqpCredits" value="0"/>
> <property key="amqpPullConsumerCredits" value="10"/>
> </bridge-from-queue>
> <bridge-to-queue name="policy-name-2">
> <include address-match="test" queue-match="myQueue" />
> </bridge-to-queue>
> <bridge-from-address name="policy-name-3">
> <include address-match="test-address" />
> <exclude address-match="all.#" />
> </bridge-from-address>
> <bridge-to-address name="policy-name-4">
> <include address-match="send-to-address" />
> <exclude address-match="all.#" />
> </bridge-to-address>
> </bridge>
> </amqp-connection>
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact