kgiusti commented on a change in pull request #554: DISPATCH-1266: Fix
unsettled multicast forwarding
URL: https://github.com/apache/qpid-dispatch/pull/554#discussion_r317634516
##########
File path: src/router_core/delivery.c
##########
@@ -558,81 +572,386 @@ void qdr_delivery_decref_CT(qdr_core_t *core,
qdr_delivery_t *dlv, const char *l
}
+// the remote endpoint has change the state (disposition) or settlement for the
+// delivery. Update the local state/settlement accordingly.
+//
static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action,
bool discard)
{
- qdr_delivery_t *dlv = action->args.delivery.delivery;
- qdr_delivery_t *peer = qdr_delivery_first_peer_CT(dlv);
- bool push = false;
- bool peer_moved = false;
- bool dlv_moved = false;
- uint64_t disp = action->args.delivery.disposition;
- bool settled = action->args.delivery.settled;
- qdr_error_t *error = action->args.delivery.error;
- bool error_unassigned = true;
+ if (discard)
+ return;
+
+ qdr_delivery_t *dlv = action->args.delivery.delivery;
+ qdr_delivery_t *peer = qdr_delivery_first_peer_CT(dlv);
+ uint64_t new_disp = action->args.delivery.disposition;
+ bool settled = action->args.delivery.settled;
+ qdr_error_t *error = action->args.delivery.error;
+ bool free_error = true;
+
+ if (dlv->multicast) {
+ //
+ // remote state change for *inbound* multicast delivery,
+ // update downstream *outbound* peers
+ //
+ qdr_delivery_mcast_update_CT(core, dlv, new_disp, settled);
- qdr_link_t *dlv_link = qdr_delivery_link(dlv);
- qdr_link_t *peer_link = qdr_delivery_link(peer);
+ } else if (peer && peer->multicast) {
+ //
+ // remote state change for an *outbound* delivery to a multicast
address,
+ // propagate upstream to *inbound* delivery (peer)
+ //
+ // coverity[swapped_arguments]
+ qdr_delivery_mcast_peer_update_CT(core, peer, dlv, new_disp, settled);
+
+ } else {
+ //
+ // Unicast forwarding
+ //
+ free_error = !qdr_delivery_ucast_update_CT(core, dlv, peer, new_disp,
settled, error);
+ }
//
- // Logic:
+ // Release the action reference, possibly freeing the delivery
//
- // If disposition has changed and there is a peer link, set the
disposition of the peer
- // If settled, the delivery must be unlinked and freed.
- // If settled and there is a peer, the peer shall be settled and unlinked.
It shall not
- // be freed until the connection-side thread settles the PN delivery.
+ qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - remove from
action");
+
+ if (free_error)
+ qdr_error_free(error);
+}
+
+
+// The remote delivery state (disposition) and/or remote settlement for an
+// unicast delivery has changed. Propagate the changes to its peer delivery.
+//
+// returns true if ownership of error parameter is taken (do not free it)
+//
+static bool qdr_delivery_ucast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
+ qdr_delivery_t *peer, uint64_t
new_disp, bool settled,
+ qdr_error_t *error)
+{
+ bool push = false;
+ bool peer_moved = false;
+ bool dlv_moved = false;
+ bool error_assigned = false;
+ qdr_link_t *dlink = qdr_delivery_link(dlv);
+
+ assert(!dlv->multicast);
+ assert(!peer || !peer->multicast);
+
+ if (peer)
+ qdr_delivery_incref(peer, "qdr_delivery_ucast_update_CT - prevent peer
from being freed");
+
+ //
+ // Non-multicast Logic:
+ //
+ // If disposition has changed and there is a peer link, set the disposition
+ // of the peer
+ // If remote settled, the delivery must be unlinked and freed.
+ // If remote settled and there is a peer, the peer shall be settled and
+ // unlinked. It shall not be freed until the connection-side thread
+ // settles the PN delivery.
//
- if (disp != dlv->disposition) {
+ if (new_disp != dlv->remote_disposition) {
//
- // Disposition has changed, propagate the change to the peer delivery.
+ // Remote disposition has changed, propagate the change to the peer
+ // delivery local disposition.
//
- dlv->disposition = disp;
+ dlv->remote_disposition = new_disp;
if (peer) {
- peer->disposition = disp;
+ peer->disposition = new_disp;
peer->error = error;
push = true;
- error_unassigned = false;
+ error_assigned = true;
qdr_delivery_copy_extension_state(dlv, peer, false);
}
}
if (settled) {
if (peer) {
peer->settled = true;
- if (peer_link) {
+ if (qdr_delivery_link(peer)) {
peer_moved = qdr_delivery_settled_CT(core, peer);
- if (peer_moved)
- push = true;
}
qdr_delivery_unlink_peers_CT(core, dlv, peer);
}
- if (dlv_link)
+ if (dlink)
dlv_moved = qdr_delivery_settled_CT(core, dlv);
}
//
// If the delivery's link has a core endpoint, notify the endpoint of the
update
//
- if (dlv_link && dlv_link->core_endpoint)
- qdrc_endpoint_do_update_CT(core, dlv_link->core_endpoint, dlv,
settled);
+ if (dlink && dlink->core_endpoint)
+ qdrc_endpoint_do_update_CT(core, dlink->core_endpoint, dlv, settled);
- if (push)
+ if (push || peer_moved)
qdr_delivery_push_CT(core, peer);
//
- // Release the action reference, possibly freeing the delivery
- //
- qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - remove from
action");
-
- //
- // Release the unsettled references if the deliveries were moved
+ // Release the unsettled references if the deliveries were moved/settled
//
if (dlv_moved)
- qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - removed
from unsettled (1)");
+ qdr_delivery_decref_CT(core, dlv, "qdr_delivery_ucast_update CT - dlv
removed from unsettled");
if (peer_moved)
- qdr_delivery_decref_CT(core, peer, "qdr_update_delivery_CT - removed
from unsettled (2)");
- if (error_unassigned)
- qdr_error_free(error);
+ qdr_delivery_decref_CT(core, peer, "qdr_delivery_ucast_update_CT -
peer removed from unsettled");
+ if (peer)
+ qdr_delivery_decref_CT(core, peer, "qdr_delivery_ucast_update_CT -
allow free of peer");
+
+ return error_assigned;
+}
+
+
+// The remote delivery state (disposition) and/or remote settlement for an
+// incoming multicast delivery has changed. Propagate the changes "downstream"
+// to the outbound peers. Once all peers have settled then settle the in_dlv
+//
+void qdr_delivery_mcast_update_CT(qdr_core_t *core, qdr_delivery_t *in_dlv,
+ uint64_t new_disp, bool settled)
+{
+ if (!in_dlv)
+ return;
+
+ bool update_disp = new_disp && in_dlv->remote_disposition != new_disp;
+
+ assert(in_dlv->multicast); // expect in_dlv to be the inbound delivery
+
+ qd_log(core->log, QD_LOG_TRACE,
+ "Remote updated mcast delivery (%p) disp=0x%"PRIx64" settled=%s",
+ in_dlv, new_disp, (settled) ? "True" : "False");
+
+ if (update_disp)
+ in_dlv->remote_disposition = new_disp;
+
+ qdr_delivery_t *out_peer = qdr_delivery_first_peer_CT(in_dlv);
+ while (out_peer) {
+ bool push = false;
+ bool moved = false;
+ bool unlink = false;
+
+ //
+ // AMQP 1.0 allows the sender to specify a disposition
+ // so forward it along
+ //
+ if (update_disp && out_peer->disposition != new_disp) {
+ out_peer->disposition = new_disp;
+ push = true;
+ // extension state/error ignored, not sure how
+ // that can be supported for mcast...
+ }
+
+ //
+ // the sender has settled
+ //
+ if (settled) {
+ out_peer->settled = true;
+ if (qdr_delivery_link(out_peer)) {
+ moved = qdr_delivery_settled_CT(core, out_peer);
+ }
+ unlink = true;
+ }
+
+ if (push || moved) {
+ qdr_delivery_push_CT(core, out_peer);
+ }
+
+ if (moved) {
+ qdr_delivery_decref_CT(core, out_peer,
+ "qdr_delivery_mcast_update_CT - removed
out_peer from unsettled");
+ }
+
+ qd_log(core->log, QD_LOG_TRACE,
+ "Updating mcast delivery (%p) out peer (%p) updated disp=%s
settled=%s",
+ in_dlv, out_peer, (push) ? "True" : "False",
+ (unlink) ? "True" : "False");
+
+ if (unlink) {
+ qdr_delivery_unlink_peers_CT(core, in_dlv, out_peer); // may free
out_peer!
+ }
+
+ out_peer = qdr_delivery_next_peer_CT(in_dlv);
+ }
+
+ if (settled) {
+ assert(qdr_delivery_peer_count_CT(in_dlv) == 0);
+ in_dlv->settled = true;
+ if (qdr_delivery_settled_CT(core, in_dlv)) {
+ qdr_delivery_decref_CT(core, in_dlv,
+ "qdr_delivery_mcast_update CT - in_dlv
removed from unsettled");
+ }
+ }
+}
+
+
+// An outgoing peer delivery of an incoming multicast delivery has settled.
+// Settle the inbound delivery after all of its outbound deliveries
+// have been settled.
+//
+// Note: this call may free either in_dlv or out_peer by unlinking them. The
+// caller must increment the reference count for these deliveries if they are
+// to be referenced after this call.
+//
+// moved: set to true if in_dlv has been removed from the unsettled list
+// return: true if in_dlv terminal state was set
+//
+static bool qdr_delivery_mcast_peer_settled_CT(qdr_core_t *core,
qdr_delivery_t *in_dlv,
+ qdr_delivery_t *out_peer, bool
*moved)
+{
+ bool push = false;
+ *moved = false;
+
+ assert(in_dlv->multicast && out_peer->peer == in_dlv);
+ assert(qdr_delivery_peer_count_CT(out_peer) == 1);
+
+ int peer_count = qdr_delivery_peer_count_CT(in_dlv);
+
+ if (peer_count == 1) {
+ //
+ // This out_peer is the last outgoing peer so
+ // we can now settle in_dlv
+ //
+ in_dlv->settled = true;
+ push = true;
+ if (qdr_delivery_link(in_dlv)) {
+ *moved = qdr_delivery_settled_CT(core, in_dlv);
+ }
+
+ qd_log(core->log, QD_LOG_TRACE,
+ "mcast delivery (%p) has settled, disp=0x%"PRIx64,
+ in_dlv, in_dlv->disposition);
+ } else {
+
+ qd_log(core->log, QD_LOG_TRACE,
+ "mcast delivery (%p) out peer (%p) has settled, remaining
peers=%d",
+ in_dlv, out_peer, peer_count - 1);
+ }
+
+ // now settle the peer itself and remove it from link unsettled list
+
+ out_peer->settled = true;
+ if (qdr_delivery_settled_CT(core, out_peer)) {
+ qdr_delivery_decref_CT(core, out_peer,
"qdr_delivery_mcast_peer_settled_CT - out_peer removed from unsettled");
+ }
+
+ // do this last since it may free either dlv or out_peer:
+ qdr_delivery_unlink_peers_CT(core, in_dlv, out_peer);
+
+ return push;
+}
+
+
+// true if delivery state d is a terminal state as defined by AMQP 1.0
+//
+#define IS_TERMINAL(d) (PN_ACCEPTED <= (d) && (d) <= PN_MODIFIED)
+
+
+// an outbound mcast delivery has changed its remote state (disposition)
+// propagate the change back "upstream" to the inbound delivery
+//
+// returns true if dlv disposition has been updated
+//
+static bool qdr_delivery_mcast_peer_state_CT(qdr_core_t *core, qdr_delivery_t
*in_dlv,
+ qdr_delivery_t *out_peer,
uint64_t new_disp)
+{
+ // The AMQP 1.0 spec does not define a way to propagate disposition
+ // back to the sender in the case of unsettled multicast. In the
+ // case of multiple different terminal states we have to reconcile
+ // them. We assign an ad hoc priority to each terminal value and
+ // set the final disposition to the highest priority returned
+ // across all receivers.
+ static const int priority[] = {
+ 2, // Accepted
+ 3, // Rejected - highest because reject is a hard error
+ 0, // Released
+ 1, // Modified
+ };
+ bool push = false;
+
+ if (!in_dlv || !out_peer)
+ return push;
+
+ assert(in_dlv->multicast && out_peer->peer == in_dlv);
+ assert(qdr_delivery_peer_count_CT(out_peer) == 1);
+
+ if (new_disp == 0x33) {
+ // hack alert - the transaction section of the AMQP 1.0 spec
+ // defines the Declared outcome (0x33) terminal state.
+ qd_log(core->log, QD_LOG_WARNING,
+ "Transactions are not supported for multicast messages");
+ new_disp = PN_REJECTED;
+ }
+
+ out_peer->remote_disposition = new_disp;
+
+ if (IS_TERMINAL(new_disp)) {
+ // our mcast impl ignores non-terminal outcomes
+
+ qd_log(core->log, QD_LOG_TRACE,
+ "mcast delivery (%p) out peer (%p) disp updated: 0x%"PRIx64,
+ in_dlv, out_peer, new_disp);
+
+ if (in_dlv->mcast_disposition == 0) {
Review comment:
do we need mcast_disposition or can we use "disposition"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]