[ 
https://issues.apache.org/jira/browse/ARTEMIS-5437?focusedWorklogId=967313&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-967313
 ]

ASF GitHub Bot logged work on ARTEMIS-5437:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Apr/25 17:11
            Start Date: 23/Apr/25 17:11
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #5641:
URL: https://github.com/apache/activemq-artemis/pull/5641#discussion_r2056521548


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeSenderInfo.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.connect.bridge;
+
+import java.util.Objects;
+import java.util.UUID;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.utils.CompositeAddress;
+
+/**
+ * Information and identification interface for AMQP bridge senders that will 
be
+ * created on the remote peer as demand on the local broker is detected. The 
behavior
+ * and meaning of some APIs in this interface may vary slightly depending on 
the role
+ * of the sender (Address or Queue).
+ */
+public class AMQPBridgeSenderInfo {
+
+   enum Role {
+      /**
+       * Sender created from a match on a configured bridge to address policy.
+       */
+      ADDRESS_SENDER,
+
+      /**
+       * Sender created from a match on a configured bridge to queue policy.
+       */
+      QUEUE_SENDER
+   }
+
+   private final Role role;
+   private final String localAddress;
+   private final String localQueue;
+   private final String localFqqn;
+   private final String remoteAddress;
+   private final RoutingType routingType;
+   private final String id;
+
+   public AMQPBridgeSenderInfo(Role role, String localAddress, String 
localQueue, RoutingType routingType, String remoteAddress) {
+      this.role = role;
+      this.localAddress = localAddress;
+      this.localQueue = localQueue;
+      if (role == Role.QUEUE_SENDER) {
+         localFqqn = CompositeAddress.toFullyQualified(localAddress, 
localQueue).toString();
+      } else {
+         localFqqn = null;
+      }
+      this.routingType = routingType;
+      this.remoteAddress = remoteAddress;
+      this.id = UUID.randomUUID().toString();
+   }
+
+   /**
+    * {@return a unique Id for the sender being represented.}

Review Comment:
   Another '.' to remove here.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeSender.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.bridge;
+
+import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DETACH_FORCED;
+import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NOT_FOUND;
+import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.RESOURCE_DELETED;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeMetrics.SenderMetrics;
+import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
+import 
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.Detach;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * Base implementation for AMQP Bridge sender implementations
+ */
+public abstract class AMQPBridgeSender implements Closeable {
+
+   // Sequence ID value used to keep links that would otherwise have the same 
name from overlapping
+   // this generally occurs when a remote link detach is delayed and new 
resources are added before it
+   // arrives resulting in an unintended link stealing scenario in the proton 
engine.
+   protected static final AtomicLong LINK_SEQUENCE_ID = new AtomicLong();
+
+   protected final AMQPBridgeManager bridgeManager;
+   protected final AMQPBridgeToPolicyManager policyManager;
+   protected final AMQPBridgeSenderConfiguration configuration;
+   protected final AMQPBridgeSenderInfo senderInfo;
+   protected final AMQPBridgePolicy policy;
+   protected final AMQPConnectionContext connection;
+   protected final AMQPSessionContext session;
+   protected final SenderMetrics metrics;
+   protected final AtomicBoolean closed = new AtomicBoolean();
+
+   protected ProtonServerSenderContext senderContext;
+   protected Sender protonSender;
+   protected volatile boolean initialized;
+   protected Consumer<AMQPBridgeSender> remoteOpenHandler;
+   protected Consumer<AMQPBridgeSender> remoteCloseHandler;
+
+   public AMQPBridgeSender(AMQPBridgeToPolicyManager policyManager,
+                           AMQPBridgeSenderConfiguration configuration,
+                           AMQPSessionContext session,
+                           AMQPBridgeSenderInfo senderInfo,
+                           SenderMetrics metrics) {
+      this.policyManager = policyManager;
+      this.bridgeManager = policyManager.getBridgeManager();
+      this.senderInfo = senderInfo;
+      this.policy = policyManager.getPolicy();
+      this.connection = session.getAMQPConnectionContext();
+      this.session = session;
+      this.configuration = configuration;
+      this.metrics = metrics;
+   }
+
+   public boolean isClosed() {
+      return closed.get();
+   }
+
+   /**
+    * {@return <code>true</code> if the receiver has previously been 
initialized.}

Review Comment:
   Another '.' to remove here.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 967313)
    Time Spent: 40m  (was: 0.5h)

> Add a more advanced message bridging feature to AMQP broker connections
> -----------------------------------------------------------------------
>
>                 Key: ARTEMIS-5437
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-5437
>             Project: ActiveMQ Artemis
>          Issue Type: New Feature
>          Components: AMQP
>    Affects Versions: 2.40.0
>            Reporter: Timothy A. Bish
>            Assignee: Timothy A. Bish
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> The AMQP broker connection currently offers a somewhat primitive bridge 
> capability by offering configuration of SENDER and RECEIVER elements which 
> can perform some basic bridging primarily between Artemis instances given the 
> way the send to address, receiver from queue behavior is carried over into 
> the implementation.
> This new AMQP bridge feature offers a more advanced and flexible bridge 
> capability that allows for messages to be bridged from or to a remote with 
> various configuration options to account for the remote not being an Artemis 
> peer but leveraging some Artemis capabilities like Core message tunneling 
> over AMQP when the remote is an Artemis instance. By default the receivers 
> for this bridge implementation activate only on local demand being present 
> but can be configured to be always active as the older RECEIVER capability 
> currently behaves.
> This implementation also incorporates features implemented in the AMQP 
> federation bits that deal with draining link credit on receivers before 
> detaching to avoid potential duplicates and idling links if demand tracking 
> is active (default mode) to avoid rapid create and destroy cycles for bridge 
> receiver links.  There is also in-built link recovery mechanisms that will 
> attempt to recreate links that are closed by the remote on a periodic basis.  
> Also like federation Queue receivers the new bridge receivers can be 
> configured in pull mode to only offer credit when the local Queue has no 
> pending backlog to avoid moving messages until there is a need.
> A example of configuration for an AMQP broker connection bridge is shown below
> {code:xml}
>          <amqp-connection uri="tcp://host:port" 
> name="my-bridge-configuration">
>             <bridge>
>                <bridge-from-queue name="policy-name-1">
>                   <include address-match="#" queue-match="someQueue" />
>                   <property key="amqpCredits" value="0"/>
>                   <property key="amqpPullConsumerCredits" value="10"/>
>                </bridge-from-queue>
>                <bridge-to-queue name="policy-name-2">
>                   <include address-match="test" queue-match="myQueue" />
>                </bridge-to-queue>
>                <bridge-from-address name="policy-name-3">
>                   <include address-match="test-address" />
>                   <exclude address-match="all.#" />
>                </bridge-from-address>
>                <bridge-to-address name="policy-name-4">
>                   <include address-match="send-to-address" />
>                   <exclude address-match="all.#" />
>                </bridge-to-address>
>             </bridge>
>          </amqp-connection>
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to