This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit b3af7de4ce33660aff4db1c9b7dc419d8b733738
Author: Ted Ross <tr...@apache.org>
AuthorDate: Thu Jun 18 14:25:44 2020 -0400

    Dataplane: Added no_route and initial_delivery on link-first-attach.
---
 include/qpid/dispatch/protocol_adaptor.h |  4 ++
 src/adaptors/reference_adaptor.c         |  8 ++++
 src/router_core/connections.c            | 64 ++++++++++++++++++++++++++++++--
 src/router_core/router_core.c            | 14 +++++++
 src/router_core/router_core_private.h    |  2 +
 src/router_core/transfer.c               | 51 ++++++++++++++-----------
 src/router_node.c                        |  4 ++
 7 files changed, 122 insertions(+), 25 deletions(-)

diff --git a/include/qpid/dispatch/protocol_adaptor.h 
b/include/qpid/dispatch/protocol_adaptor.h
index 9920aba..bbf2f27 100644
--- a/include/qpid/dispatch/protocol_adaptor.h
+++ b/include/qpid/dispatch/protocol_adaptor.h
@@ -775,6 +775,8 @@ const char *qdr_link_name(const qdr_link_t *link);
  * @param target Target terminus of the attach
  * @param name - name of the link
  * @param terminus_addr - terminus address if any
+ * @param no_route If true, new deliveries are not to be routed to this link
+ * @param initial_delivery (optional) Move this delivery from its existing 
link to the head of this link's buffer
  * @param link_id - set to the management id of the new link
  * @return A pointer to a new qdr_link_t object to track the link
  */
@@ -784,6 +786,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
                                   qdr_terminus_t   *target,
                                   const char       *name,
                                   const char       *terminus_addr,
+                                  bool              no_route,
+                                  qdr_delivery_t   *initial_delivery,
                                   uint64_t         *link_id);
 
 /**
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index 7a65c17..59975a9 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -113,6 +113,8 @@ static void qdr_ref_second_attach(void *context, qdr_link_t 
*link,
                                                     target,           
//qdr_terminus_t   *target,
                                                     "ref.1",          //const 
char       *name,
                                                     0,                //const 
char       *terminus_addr,
+                                                    false,            //bool   
           no_route
+                                                    0,                
//qdr_delivery_t   *initial_delivery
                                                     &link_id);
 
         target = qdr_terminus(0);
@@ -123,6 +125,8 @@ static void qdr_ref_second_attach(void *context, qdr_link_t 
*link,
                                                     target,           
//qdr_terminus_t   *target,
                                                     "ref.2",          //const 
char       *name,
                                                     0,                //const 
char       *terminus_addr,
+                                                    false,            //bool   
           no_route
+                                                    0,                
//qdr_delivery_t   *initial_delivery
                                                     &link_id);
 
         source = qdr_terminus(0);
@@ -133,6 +137,8 @@ static void qdr_ref_second_attach(void *context, qdr_link_t 
*link,
                                                    qdr_terminus(0),  
//qdr_terminus_t   *target,
                                                    "ref.3",          //const 
char       *name,
                                                    0,                //const 
char       *terminus_addr,
+                                                   false,            //bool    
          no_route
+                                                   0,                
//qdr_delivery_t   *initial_delivery
                                                    &link_id);
     }
 }
@@ -366,6 +372,8 @@ static void on_startup(void *context)
                                                      qdr_terminus(0),  
//qdr_terminus_t   *target,
                                                      "ref.0",          //const 
char       *name,
                                                      0,                //const 
char       *terminus_addr,
+                                                     false,            //bool  
            no_route
+                                                     0,                
//qdr_delivery_t   *initial_delivery
                                                      &link_id);
 }
 
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 88c4737..d76f01c 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -543,6 +543,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
                                   qdr_terminus_t   *target,
                                   const char       *name,
                                   const char       *terminus_addr,
+                                  bool              no_route,
+                                  qdr_delivery_t   *initial_delivery,
                                   uint64_t         *link_id)
 {
     qdr_action_t   *action         = 
qdr_action(qdr_link_inbound_first_attach_CT, "link_first_attach");
@@ -573,6 +575,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
     link->core_ticks     = conn->core->uptime_ticks;
     link->zero_credit_time = conn->core->uptime_ticks;
     link->terminus_survives_disconnect = 
qdr_terminus_survives_disconnect(local_terminus);
+    link->no_route = no_route;
 
     link->strip_annotations_in  = conn->strip_annotations_in;
     link->strip_annotations_out = conn->strip_annotations_out;
@@ -595,6 +598,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
     action->args.connection.dir    = dir;
     action->args.connection.source = source;
     action->args.connection.target = target;
+    action->args.connection.initial_delivery = initial_delivery;
     qdr_action_enqueue(conn->core, action);
 
     return link;
@@ -1563,6 +1567,56 @@ static void qdr_attach_link_downlink_CT(qdr_core_t 
*core, qdr_connection_t *conn
 }
 
 
+static void qdr_link_process_initial_delivery_CT(qdr_core_t *core, qdr_link_t 
*link, qdr_delivery_t *dlv)
+{
+    qdr_link_t *old_link  = safe_deref_qdr_link_t(dlv->link_sp);
+    int         ref_delta = 0;
+
+    //
+    // Remove the delivery from its current link if needed
+    //
+    if (!!old_link) {
+        switch (dlv->where) {
+        case QDR_DELIVERY_NOWHERE:
+            break;
+
+        case QDR_DELIVERY_IN_UNDELIVERED:
+            DEQ_REMOVE(old_link->undelivered, dlv);
+            dlv->where = QDR_DELIVERY_NOWHERE;
+            ref_delta--;
+            break;
+
+        case QDR_DELIVERY_IN_UNSETTLED:
+            DEQ_REMOVE(old_link->unsettled, dlv);
+            dlv->where = QDR_DELIVERY_NOWHERE;
+            ref_delta--;
+            break;
+
+        case QDR_DELIVERY_IN_SETTLED:
+            DEQ_REMOVE(old_link->settled, dlv);
+            dlv->where = QDR_DELIVERY_NOWHERE;
+            ref_delta--;
+            break;
+        }
+    }
+
+    //
+    // Enqueue the delivery onto the new link's undelivered list
+    //
+    set_safe_ptr_qdr_link_t(link, &dlv->link_sp);
+    qdr_forward_deliver_CT(core, link, dlv);
+
+    //
+    // Adjust the delivery's reference count
+    //
+    assert(ref_delta <= 0);
+    while (ref_delta < 0) {
+        qdr_delivery_decref(core, dlv, "qdr_link_process_initial_delivery_CT");
+        ref_delta++;
+    }
+}
+
+
 static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t 
*action, bool discard)
 {
     qdr_connection_t  *conn = 
safe_deref_qdr_connection_t(action->args.connection.conn);
@@ -1570,9 +1624,10 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t 
*core, qdr_action_t *act
     if (discard || !conn || !link)
         return;
 
-    qd_direction_t     dir    = action->args.connection.dir;
-    qdr_terminus_t    *source = action->args.connection.source;
-    qdr_terminus_t    *target = action->args.connection.target;
+    qd_direction_t  dir         = action->args.connection.dir;
+    qdr_terminus_t *source      = action->args.connection.source;
+    qdr_terminus_t *target      = action->args.connection.target;
+    qdr_delivery_t *initial_dlv = action->args.connection.initial_delivery;
 
     //
     // Start the attach count.
@@ -1675,6 +1730,9 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t 
*core, qdr_action_t *act
         //
         // Handle outgoing link cases
         //
+        if (initial_dlv)
+            qdr_link_process_initial_delivery_CT(core, link, initial_dlv);
+
         switch (link->link_type) {
         case QD_LINK_ENDPOINT: {
             if (core->addr_lookup_handler)
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 25c124a..f61ebfd 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -567,6 +567,13 @@ void qdr_core_bind_address_link_CT(qdr_core_t *core, 
qdr_address_t *addr, qdr_li
     if (key && (*key == QD_ITER_HASH_PREFIX_MOBILE))
         link->phase = (int) (key[1] - '0');
 
+    //
+    // If this link is configured as no-route, don't create any functional 
linkage between the
+    // link and the address beyond the owning_addr.
+    //
+    if (link->no_route)
+        return;
+
     if (link->link_direction == QD_OUTGOING) {
         qdr_add_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
         if (DEQ_SIZE(addr->rlinks) == 1) {
@@ -599,6 +606,13 @@ void qdr_core_unbind_address_link_CT(qdr_core_t *core, 
qdr_address_t *addr, qdr_
 {
     link->owning_addr = 0;
 
+    //
+    // If the link is configured as no_route, there will be no further 
link/address
+    // linkage to disconnect.
+    //
+    if (link->no_route)
+        return;
+
     if (link->link_direction == QD_OUTGOING) {
         qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
         if (DEQ_SIZE(addr->rlinks) == 0) {
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index ac123e5..1709442 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -136,6 +136,7 @@ struct qdr_action_t {
             qd_detach_type_t     dt;
             int                  credit;
             bool                 drain;
+            qdr_delivery_t      *initial_delivery;
         } connection;
 
         //
@@ -464,6 +465,7 @@ struct qdr_link_t {
     bool                     streaming;         ///< True if this link can be 
reused for streaming msgs
     bool                     in_streaming_pool; ///< True if this link is in 
the connections standby pool STREAMING_POOL
     bool                     terminus_survives_disconnect;
+    bool                     no_route;          ///< True if this link is to 
not receive routed deliveries
     char                    *strip_prefix;
     char                    *insert_prefix;
 
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index add176a..6ca2b2b 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -151,7 +151,8 @@ int qdr_link_process_deliveries(qdr_core_t *core, 
qdr_link_t *link, int credit)
             dlv = DEQ_HEAD(link->undelivered);
             if (dlv) {
                 qdr_delivery_incref(dlv, "qdr_link_process_deliveries - 
holding the undelivered delivery locally");
-                uint64_t new_disp = 0;
+                uint64_t new_disp    = 0;
+                bool     to_new_link = false;  ///< Delivery got moved to a 
new link by the handler
 
                 // DISPATCH-1302 race hack fix: There is a race between the 
CORE thread
                 // and the outbound (this) thread over settlement. It occurs 
when the CORE
@@ -165,9 +166,13 @@ int qdr_link_process_deliveries(qdr_core_t *core, 
qdr_link_t *link, int credit)
                     sys_mutex_unlock(conn->work_lock);
                     new_disp = 
conn->protocol_adaptor->deliver_handler(conn->protocol_adaptor->user_context, 
link, dlv, settled);
                     sys_mutex_lock(conn->work_lock);
-                } while (settled != dlv->settled);  // oops missed the 
settlement
+                    if (safe_deref_qdr_link_t(dlv->link_sp) != link) {
+                        to_new_link = true;
+                        break;
+                    }
+                } while (settled != dlv->settled && !to_new_link);  // oops 
missed the settlement
                 send_complete = qdr_delivery_send_complete(dlv);
-                if (send_complete) {
+                if (send_complete || to_new_link) {
                     //
                     // The entire message has been sent. It is now the 
appropriate time to have the delivery removed
                     // from the head of the undelivered list and move it to 
the unsettled list if it is not settled.
@@ -178,26 +183,28 @@ int qdr_link_process_deliveries(qdr_core_t *core, 
qdr_link_t *link, int credit)
                     link->credit_to_core--;
                     link->total_deliveries++;
 
-                    // DISPATCH-1153:
-                    // If the undelivered list is cleared the link may have 
detached.  Stop processing.
-                    offer = DEQ_SIZE(link->undelivered);
-                    if (offer == 0) {
-                        qdr_delivery_decref(core, dlv, 
"qdr_link_process_deliveries - release local reference - closed link");
-                        sys_mutex_unlock(conn->work_lock);
-                        return num_deliveries_completed;
-                    }
+                    if (!to_new_link) {
+                        // DISPATCH-1153:
+                        // If the undelivered list is cleared the link may 
have detached.  Stop processing.
+                        offer = DEQ_SIZE(link->undelivered);
+                        if (offer == 0) {
+                            qdr_delivery_decref(core, dlv, 
"qdr_link_process_deliveries - release local reference - closed link");
+                            sys_mutex_unlock(conn->work_lock);
+                            return num_deliveries_completed;
+                        }
 
-                    assert(dlv == DEQ_HEAD(link->undelivered));
-                    DEQ_REMOVE_HEAD(link->undelivered);
-                    dlv->link_work = 0;
-
-                    if (settled || qdr_delivery_oversize(dlv) || 
qdr_delivery_is_aborted(dlv)) {
-                        dlv->where = QDR_DELIVERY_NOWHERE;
-                        qdr_delivery_decref(core, dlv, 
"qdr_link_process_deliveries - remove from undelivered list");
-                    } else {
-                        DEQ_INSERT_TAIL(link->unsettled, dlv);
-                        dlv->where = QDR_DELIVERY_IN_UNSETTLED;
-                        qd_log(core->log, QD_LOG_DEBUG, "Delivery transfer:  
dlv:%lx qdr_link_process_deliveries: undelivered-list -> unsettled-list", 
(long) dlv);
+                        assert(dlv == DEQ_HEAD(link->undelivered));
+                        DEQ_REMOVE_HEAD(link->undelivered);
+                        dlv->link_work = 0;
+
+                        if (settled || qdr_delivery_oversize(dlv) || 
qdr_delivery_is_aborted(dlv)) {
+                            dlv->where = QDR_DELIVERY_NOWHERE;
+                            qdr_delivery_decref(core, dlv, 
"qdr_link_process_deliveries - remove from undelivered list");
+                        } else {
+                            DEQ_INSERT_TAIL(link->unsettled, dlv);
+                            dlv->where = QDR_DELIVERY_IN_UNSETTLED;
+                            qd_log(core->log, QD_LOG_DEBUG, "Delivery 
transfer:  dlv:%lx qdr_link_process_deliveries: undelivered-list -> 
unsettled-list", (long) dlv);
+                        }
                     }
                 }
                 else {
diff --git a/src/router_node.c b/src/router_node.c
index bd60542..993a235 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -827,6 +827,8 @@ static int AMQP_incoming_link_handler(void* context, 
qd_link_t *link)
                                                        
qdr_terminus(qd_link_remote_target(link)),
                                                        
pn_link_name(qd_link_pn(link)),
                                                        terminus_addr,
+                                                       false,
+                                                       0,
                                                        &link_id);
     qd_link_set_link_id(link, link_id);
     qdr_link_set_context(qdr_link, link);
@@ -856,6 +858,8 @@ static int AMQP_outgoing_link_handler(void* context, 
qd_link_t *link)
                                                  
qdr_terminus(qd_link_remote_target(link)),
                                                  
pn_link_name(qd_link_pn(link)),
                                                  terminus_addr,
+                                                 false,
+                                                 0,
                                                  &link_id);
     qd_link_set_link_id(link, link_id);
     qdr_link_set_context(qdr_link, link);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to