kgiusti commented on a change in pull request #554: DISPATCH-1266: Fix 
unsettled multicast forwarding
URL: https://github.com/apache/qpid-dispatch/pull/554#discussion_r317637876
 
 

 ##########
 File path: src/router_core/delivery.c
 ##########
 @@ -558,81 +572,386 @@ void qdr_delivery_decref_CT(qdr_core_t *core, 
qdr_delivery_t *dlv, const char *l
 }
 
 
+// the remote endpoint has change the state (disposition) or settlement for the
+// delivery.  Update the local state/settlement accordingly.
+//
 static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard)
 {
-    qdr_delivery_t *dlv        = action->args.delivery.delivery;
-    qdr_delivery_t *peer       = qdr_delivery_first_peer_CT(dlv);
-    bool            push       = false;
-    bool            peer_moved = false;
-    bool            dlv_moved  = false;
-    uint64_t        disp       = action->args.delivery.disposition;
-    bool            settled    = action->args.delivery.settled;
-    qdr_error_t    *error      = action->args.delivery.error;
-    bool error_unassigned      = true;
+    if (discard)
+        return;
+
+    qdr_delivery_t *dlv      = action->args.delivery.delivery;
+    qdr_delivery_t *peer     = qdr_delivery_first_peer_CT(dlv);
+    uint64_t        new_disp = action->args.delivery.disposition;
+    bool            settled  = action->args.delivery.settled;
+    qdr_error_t    *error    = action->args.delivery.error;
+    bool free_error          = true;
+
+    if (dlv->multicast) {
+        //
+        // remote state change for *inbound* multicast delivery,
+        // update downstream *outbound* peers
+        //
+        qdr_delivery_mcast_update_CT(core, dlv, new_disp, settled);
 
-    qdr_link_t *dlv_link  = qdr_delivery_link(dlv);
-    qdr_link_t *peer_link = qdr_delivery_link(peer);
+    } else if (peer && peer->multicast) {
+        //
+        // remote state change for an *outbound* delivery to a multicast 
address,
+        // propagate upstream to *inbound* delivery (peer)
+        //
+        // coverity[swapped_arguments]
+        qdr_delivery_mcast_peer_update_CT(core, peer, dlv, new_disp, settled);
+
+    } else {
+        //
+        // Unicast forwarding
+        //
+        free_error = !qdr_delivery_ucast_update_CT(core, dlv, peer, new_disp, 
settled, error);
+    }
 
     //
-    // Logic:
+    // Release the action reference, possibly freeing the delivery
     //
-    // If disposition has changed and there is a peer link, set the 
disposition of the peer
-    // If settled, the delivery must be unlinked and freed.
-    // If settled and there is a peer, the peer shall be settled and unlinked. 
 It shall not
-    //   be freed until the connection-side thread settles the PN delivery.
+    qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - remove from 
action");
+
+    if (free_error)
+        qdr_error_free(error);
+}
+
+
+// The remote delivery state (disposition) and/or remote settlement for an
+// unicast delivery has changed.  Propagate the changes to its peer delivery.
+//
+// returns true if ownership of error parameter is taken (do not free it)
+//
+static bool qdr_delivery_ucast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
+                                         qdr_delivery_t *peer, uint64_t 
new_disp, bool settled,
+                                         qdr_error_t *error)
+{
+    bool push           = false;
+    bool peer_moved     = false;
+    bool dlv_moved      = false;
+    bool error_assigned = false;
+    qdr_link_t *dlink   = qdr_delivery_link(dlv);
+
+    assert(!dlv->multicast);
+    assert(!peer || !peer->multicast);
+
+    if (peer)
+        qdr_delivery_incref(peer, "qdr_delivery_ucast_update_CT - prevent peer 
from being freed");
+
+    //
+    // Non-multicast Logic:
+    //
+    // If disposition has changed and there is a peer link, set the disposition
+    // of the peer
+    // If remote settled, the delivery must be unlinked and freed.
+    // If remote settled and there is a peer, the peer shall be settled and
+    // unlinked.  It shall not be freed until the connection-side thread
+    // settles the PN delivery.
     //
-    if (disp != dlv->disposition) {
+    if (new_disp != dlv->remote_disposition) {
         //
-        // Disposition has changed, propagate the change to the peer delivery.
+        // Remote disposition has changed, propagate the change to the peer
+        // delivery local disposition.
         //
-        dlv->disposition = disp;
+        dlv->remote_disposition = new_disp;
         if (peer) {
-            peer->disposition = disp;
+            peer->disposition = new_disp;
             peer->error       = error;
             push = true;
-            error_unassigned = false;
+            error_assigned = true;
             qdr_delivery_copy_extension_state(dlv, peer, false);
         }
     }
 
     if (settled) {
         if (peer) {
             peer->settled = true;
-            if (peer_link) {
+            if (qdr_delivery_link(peer)) {
                 peer_moved = qdr_delivery_settled_CT(core, peer);
-                if (peer_moved)
-                    push = true;
             }
             qdr_delivery_unlink_peers_CT(core, dlv, peer);
         }
 
-        if (dlv_link)
+        if (dlink)
             dlv_moved = qdr_delivery_settled_CT(core, dlv);
     }
 
     //
     // If the delivery's link has a core endpoint, notify the endpoint of the 
update
     //
-    if (dlv_link && dlv_link->core_endpoint)
-        qdrc_endpoint_do_update_CT(core, dlv_link->core_endpoint, dlv, 
settled);
+    if (dlink && dlink->core_endpoint)
+        qdrc_endpoint_do_update_CT(core, dlink->core_endpoint, dlv, settled);
 
-    if (push)
+    if (push || peer_moved)
         qdr_delivery_push_CT(core, peer);
 
     //
-    // Release the action reference, possibly freeing the delivery
-    //
-    qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - remove from 
action");
-
-    //
-    // Release the unsettled references if the deliveries were moved
+    // Release the unsettled references if the deliveries were moved/settled
     //
     if (dlv_moved)
-        qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - removed 
from unsettled (1)");
+        qdr_delivery_decref_CT(core, dlv, "qdr_delivery_ucast_update CT - dlv 
removed from unsettled");
     if (peer_moved)
-        qdr_delivery_decref_CT(core, peer, "qdr_update_delivery_CT - removed 
from unsettled (2)");
-    if (error_unassigned)
-        qdr_error_free(error);
+        qdr_delivery_decref_CT(core, peer, "qdr_delivery_ucast_update_CT - 
peer removed from unsettled");
+    if (peer)
+        qdr_delivery_decref_CT(core, peer, "qdr_delivery_ucast_update_CT - 
allow free of peer");
+
+    return error_assigned;
+}
+
+
+// The remote delivery state (disposition) and/or remote settlement for an
+// incoming multicast delivery has changed.  Propagate the changes "downstream"
+// to the outbound peers.  Once all peers have settled then settle the in_dlv
+//
+void qdr_delivery_mcast_update_CT(qdr_core_t *core, qdr_delivery_t *in_dlv,
+                                  uint64_t new_disp, bool settled)
+{
+    if (!in_dlv)
+        return;
+
+    bool update_disp = new_disp && in_dlv->remote_disposition != new_disp;
+
+    assert(in_dlv->multicast);  // expect in_dlv to be the inbound delivery
+
+    qd_log(core->log, QD_LOG_TRACE,
+           "Remote updated mcast delivery (%p) disp=0x%"PRIx64" settled=%s",
+           in_dlv, new_disp, (settled) ? "True" : "False");
+
+    if (update_disp)
+        in_dlv->remote_disposition = new_disp;
+
+    qdr_delivery_t *out_peer = qdr_delivery_first_peer_CT(in_dlv);
+    while (out_peer) {
+        bool push  = false;
+        bool moved  = false;
+        bool unlink = false;
+
+        //
+        // AMQP 1.0 allows the sender to specify a disposition
+        // so forward it along
+        //
+        if (update_disp && out_peer->disposition != new_disp) {
+            out_peer->disposition = new_disp;
+            push = true;
+            // extension state/error ignored, not sure how
+            // that can be supported for mcast...
+        }
+
+        //
+        // the sender has settled
+        //
+        if (settled) {
+            out_peer->settled = true;
+            if (qdr_delivery_link(out_peer)) {
+                moved = qdr_delivery_settled_CT(core, out_peer);
+            }
+            unlink = true;
+        }
+
+        if (push || moved) {
+            qdr_delivery_push_CT(core, out_peer);
+        }
+
+        if (moved) {
+            qdr_delivery_decref_CT(core, out_peer,
+                                   "qdr_delivery_mcast_update_CT - removed 
out_peer from unsettled");
+        }
+
+        qd_log(core->log, QD_LOG_TRACE,
+               "Updating mcast delivery (%p) out peer (%p) updated disp=%s 
settled=%s",
+               in_dlv, out_peer, (push) ? "True" : "False",
+               (unlink) ? "True" : "False");
+
+        if (unlink) {
+            qdr_delivery_unlink_peers_CT(core, in_dlv, out_peer);  // may free 
out_peer!
+        }
+
+        out_peer = qdr_delivery_next_peer_CT(in_dlv);
+    }
+
+    if (settled) {
+        assert(qdr_delivery_peer_count_CT(in_dlv) == 0);
+        in_dlv->settled = true;
+        if (qdr_delivery_settled_CT(core, in_dlv)) {
+            qdr_delivery_decref_CT(core, in_dlv,
+                                   "qdr_delivery_mcast_update CT - in_dlv 
removed from unsettled");
+        }
+    }
+}
+
+
+// An outgoing peer delivery of an incoming multicast delivery has settled.
+// Settle the inbound delivery after all of its outbound deliveries
+// have been settled.
+//
+// Note: this call may free either in_dlv or out_peer by unlinking them. The
+// caller must increment the reference count for these deliveries if they are
+// to be referenced after this call.
+//
+// moved: set to true if in_dlv has been removed from the unsettled list
+// return: true if in_dlv terminal state was set
 
 Review comment:
   has been settled

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to