This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 81417af373b395a961a47ff226669d29f87f0622 Author: Andrew Stitcher <astitc...@apache.org> AuthorDate: Mon Jun 9 18:05:47 2025 -0400 PROTON-2873: Add a disposition for transaction declaration The original work adding in the transactional state disposition did not include the declared disposition used by the transaction coordinator to allocate a transaction id to a new transaction. Subsequent code used the tranasaction state disposition instead of the declared disposition which is incorrect. --- c/include/proton/disposition.h | 40 ++++++++++++++++++++++++++- c/src/core/emitters.h | 14 +++++++++- c/src/core/engine-internal.h | 6 ++++ c/src/core/engine.c | 23 ++++++++++++++++ c/src/core/transport.c | 8 ++++++ c/tools/codec-generator/specs.json | 1 + python/cproton.h | 5 ++++ python/cproton.py | 14 ++++++++-- python/examples/broker.py | 8 +++--- python/proton/__init__.py | 3 +- python/proton/_delivery.py | 55 ++++++++++++++++++++++++++++++++++++- python/proton/_reactor.py | 2 +- python/tests/proton_tests/engine.py | 25 +++++++++++++++-- 13 files changed, 190 insertions(+), 14 deletions(-) diff --git a/c/include/proton/disposition.h b/c/include/proton/disposition.h index e3b7a7800..7d7fdee3b 100644 --- a/c/include/proton/disposition.h +++ b/c/include/proton/disposition.h @@ -90,6 +90,13 @@ typedef struct pn_disposition_t pn_disposition_t; */ #define PN_MODIFIED (0x0000000000000027) +/** + * The PN_DECLARED delivery state is a terminal state + * indicating that a transaction has been declared and indicating its + * transaction identifier. + */ +#define PN_DECLARED (0x0000000000000033) + /** * The PN_TRANSACTIONAL_STATE delivery state is a non terminal state * indicating the transactional state of a delivery. @@ -254,6 +261,13 @@ typedef struct pn_rejected_disposition_t pn_rejected_disposition_t; */ typedef struct pn_modified_disposition_t pn_modified_disposition_t; +/** + * A transaction declared delivery disposition + * + * This represents a transaction declared disposition. + */ +typedef struct pn_declared_disposition_t pn_declared_disposition_t; + /** * A transactional delivery disposition * @@ -304,6 +318,15 @@ PN_EXTERN pn_rejected_disposition_t *pn_rejected_disposition(pn_disposition_t *d */ PN_EXTERN pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition); +/** + * Convert a delivery disposition to a transaction declared disposition + * + * @param[in] disposition delivery disposition object + * @return a pointer to the transaction declared disposition or NULL + * if the disposition is not that type + */ +PN_EXTERN pn_declared_disposition_t *pn_declared_disposition(pn_disposition_t *disposition); + /** * Convert a delivery disposition to a transactional disposition * @@ -441,9 +464,24 @@ PN_EXTERN void pn_modified_disposition_set_undeliverable(pn_modified_disposition */ PN_EXTERN pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *disposition); +/** + * Get the transaction id for a transaction declared disposition + * + * @param[in] disposition a transaction declared disposition object + * @return the transaction id + */ +PN_EXTERN pn_bytes_t pn_declared_disposition_get_id(pn_declared_disposition_t *disposition); + +/** + * Set the transaction id for a transaction declared disposition + * + * @param[in] disposition a transactional disposition object + * @param[in] id the transaction id + */ +PN_EXTERN void pn_declared_disposition_set_id(pn_declared_disposition_t *disposition, pn_bytes_t id); /** - * Get the transaction id for a transactional disposition + * Get the transaction id for a transaction declared disposition * * @param[in] disposition a transactional disposition object * @return the transaction id diff --git a/c/src/core/emitters.h b/c/src/core/emitters.h index f20bdff10..c7abe017c 100644 --- a/c/src/core/emitters.h +++ b/c/src/core/emitters.h @@ -647,7 +647,15 @@ static inline void emit_modified_disposition(pni_emitter_t* emitter, pni_compoun } } -static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_disposition_t *disposition); +static inline void emit_declared_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_declared_disposition_t *disposition){ + for (bool small_encoding = true; ; small_encoding = false) { + pni_compound_context c = emit_list(emitter, compound0, small_encoding, true); + pni_compound_context compound = c; + emit_binary_bytes(emitter, &compound, disposition->id); + emit_end_list(emitter, &compound, small_encoding); + if (encode_succeeded(emitter, &compound)) break; + } +} static inline void emit_transactional_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_transactional_disposition_t *disposition){ for (bool small_encoding = true; ; small_encoding = false) { @@ -700,6 +708,10 @@ static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context emit_descriptor(emitter, compound0, AMQP_DESC_MODIFIED); emit_modified_disposition(emitter, compound0, &disposition->u.s_modified); return; + case PN_DISP_DECLARED: + emit_descriptor(emitter, compound0, AMQP_DESC_DECLARED); + emit_declared_disposition(emitter, compound0, &disposition->u.s_declared); + return; case PN_DISP_TRANSACTIONAL: emit_descriptor(emitter, compound0, AMQP_DESC_TRANSACTIONAL_STATE); emit_transactional_disposition(emitter, compound0, &disposition->u.s_transactional); diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h index 36950de16..5bab0f7bd 100644 --- a/c/src/core/engine-internal.h +++ b/c/src/core/engine-internal.h @@ -341,6 +341,7 @@ typedef enum pn_disposition_type_t { PN_DISP_REJECTED = PN_REJECTED, PN_DISP_RELEASED = PN_RELEASED, PN_DISP_MODIFIED = PN_MODIFIED, + PN_DISP_DECLARED = PN_DECLARED, PN_DISP_TRANSACTIONAL = PN_TRANSACTIONAL_STATE, } pn_disposition_type_t; @@ -360,6 +361,10 @@ struct pn_modified_disposition_t { bool undeliverable; }; +struct pn_declared_disposition_t { + pn_bytes_t id; +}; + struct pn_transactional_disposition_t { pn_bytes_t id; pn_bytes_t outcome_raw; @@ -376,6 +381,7 @@ struct pn_disposition_t { struct pn_received_disposition_t s_received; struct pn_rejected_disposition_t s_rejected; struct pn_modified_disposition_t s_modified; + struct pn_declared_disposition_t s_declared; struct pn_transactional_disposition_t s_transactional; struct pn_custom_disposition_t s_custom; } u; diff --git a/c/src/core/engine.c b/c/src/core/engine.c index 21b3fa453..7d16a01e8 100644 --- a/c/src/core/engine.c +++ b/c/src/core/engine.c @@ -2013,6 +2013,13 @@ pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition return &disposition->u.s_modified; } +pn_declared_disposition_t *pn_declared_disposition(pn_disposition_t *disposition) +{ + if (disposition->type==PN_DISP_EMPTY) disposition->type = PN_DISP_DECLARED; + else if (disposition->type!=PN_DISP_DECLARED) return NULL; + return &disposition->u.s_declared; +} + pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t *disposition) { if (disposition->type==PN_DISP_EMPTY) disposition->type = PN_DISP_TRANSACTIONAL; @@ -2099,6 +2106,19 @@ pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *dispos return disposition->annotations; } +pn_bytes_t pn_declared_disposition_get_id(pn_declared_disposition_t *disposition) +{ + assert(disposition); + return disposition->id; +} + +void pn_declared_disposition_set_id(pn_declared_disposition_t *disposition, pn_bytes_t id) +{ + assert(disposition); + pn_bytes_free(disposition->id); + disposition->id = pn_bytes_dup(id); +} + pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t *disposition) { assert(disposition); @@ -2473,6 +2493,7 @@ void pn_delivery_update(pn_delivery_t *delivery, uint64_t state) case PN_RECEIVED: case PN_MODIFIED: case PN_RELEASED: + case PN_DECLARED: case PN_TRANSACTIONAL_STATE: break; default: @@ -2488,6 +2509,7 @@ void pn_delivery_update(pn_delivery_t *delivery, uint64_t state) case PN_RECEIVED: case PN_MODIFIED: case PN_RELEASED: + case PN_DECLARED: case PN_TRANSACTIONAL_STATE: delivery->local.type = state; break; @@ -2859,6 +2881,7 @@ const char *pn_disposition_type_name(uint64_t d) { case PN_REJECTED: return "rejected"; case PN_RELEASED: return "released"; case PN_MODIFIED: return "modified"; + case PN_DECLARED: return "transaction_declared"; case PN_TRANSACTIONAL_STATE: return "transactional_state"; default: return "unknown"; } diff --git a/c/src/core/transport.c b/c/src/core/transport.c index 6f633511e..1b284dffd 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -1605,6 +1605,14 @@ static void pni_amqp_decode_disposition (uint64_t type, pn_bytes_t disp_data, pn } break; } + case AMQP_DESC_DECLARED: { + pn_bytes_t id; + pn_amqp_decode_Eze(disp_data, &id); + disp->type = PN_DISP_DECLARED; + pn_bytes_free(disp->u.s_declared.id); + disp->u.s_declared.id = pn_bytes_dup(id); + break; + } case AMQP_DESC_TRANSACTIONAL_STATE: { pn_bytes_t id; bool qoutcome; diff --git a/c/tools/codec-generator/specs.json b/c/tools/codec-generator/specs.json index 735d076ed..c77f26e1d 100644 --- a/c/tools/codec-generator/specs.json +++ b/c/tools/codec-generator/specs.json @@ -27,6 +27,7 @@ "[D.[sSR]]", "[?o?oR]", "[z?R]", + "[z]", "D.R", "D.[.....D..D.[.....RR]]", "D.[.....D..D.[R]...]", diff --git a/python/cproton.h b/python/cproton.h index 415d9c776..c2948d711 100644 --- a/python/cproton.h +++ b/python/cproton.h @@ -410,12 +410,14 @@ typedef struct pn_custom_disposition_t pn_custom_disposition_t; typedef struct pn_received_disposition_t pn_received_disposition_t; typedef struct pn_rejected_disposition_t pn_rejected_disposition_t; typedef struct pn_modified_disposition_t pn_modified_disposition_t; +typedef struct pn_declared_disposition_t pn_declared_disposition_t; typedef struct pn_transactional_disposition_t pn_transactional_disposition_t; pn_custom_disposition_t *pn_custom_disposition(pn_disposition_t *disposition); pn_received_disposition_t *pn_received_disposition(pn_disposition_t *disposition); pn_rejected_disposition_t *pn_rejected_disposition(pn_disposition_t *disposition); pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition); +pn_declared_disposition_t *pn_declared_disposition(pn_disposition_t *disposition); pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t *disposition); void pn_custom_disposition_set_type(pn_custom_disposition_t *disposition, uint64_t type); @@ -431,6 +433,8 @@ void pn_modified_disposition_set_failed(pn_modified_disposition_t *disposition, _Bool pn_modified_disposition_is_undeliverable(pn_modified_disposition_t *disposition); void pn_modified_disposition_set_undeliverable(pn_modified_disposition_t *disposition, _Bool undeliverable); pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *disposition); +pn_bytes_t pn_declared_disposition_get_id(pn_declared_disposition_t *disposition); +void pn_declared_disposition_set_id(pn_declared_disposition_t *disposition, pn_bytes_t id); pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t *disposition); void pn_transactional_disposition_set_id(pn_transactional_disposition_t *disposition, pn_bytes_t id); uint64_t pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t *disposition); @@ -664,6 +668,7 @@ int pn_transport_unbind(pn_transport_t *transport); #define PN_REJECTED ... #define PN_RELEASED ... #define PN_MODIFIED ... +#define PN_DECLARED ... #define PN_TRANSACTIONAL_STATE ... // Default message priority diff --git a/python/cproton.py b/python/cproton.py index f58518578..ea19dc742 100644 --- a/python/cproton.py +++ b/python/cproton.py @@ -54,7 +54,8 @@ from cproton_ffi.lib import (PN_ACCEPTED, PN_ARRAY, PN_BINARY, PN_BOOL, PN_BYTE, PN_SSL_RESUME_UNKNOWN, PN_SSL_SHA1, PN_SSL_SHA256, PN_SSL_SHA512, PN_SSL_VERIFY_PEER, PN_SSL_VERIFY_PEER_NAME, PN_STRING, PN_SYMBOL, PN_TARGET, PN_TIMEOUT, PN_TIMER_TASK, PN_TIMESTAMP, PN_TRACE_DRV, - PN_TRACE_FRM, PN_TRACE_OFF, PN_TRACE_RAW, PN_TRANSACTIONAL_STATE, PN_TRANSPORT, + PN_TRACE_FRM, PN_TRACE_OFF, PN_TRACE_RAW, PN_DECLARED, + PN_TRANSACTIONAL_STATE, PN_TRANSPORT, PN_TRANSPORT_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_TAIL_CLOSED, PN_UBYTE, PN_UINT, PN_ULONG, PN_UNSPECIFIED, PN_USHORT, PN_UUID, PN_VERSION_MAJOR, PN_VERSION_MINOR, @@ -178,7 +179,8 @@ from cproton_ffi.lib import (PN_ACCEPTED, PN_ARRAY, PN_BINARY, PN_BOOL, PN_BYTE, pn_modified_disposition_set_failed, pn_modified_disposition_is_undeliverable, pn_modified_disposition_set_undeliverable, - pn_modified_disposition_annotations) + pn_modified_disposition_annotations, + pn_declared_disposition) def isnull(obj): @@ -774,3 +776,11 @@ def pn_transactional_disposition_get_id(disp): def pn_transactional_disposition_set_id(disp, id): return lib.pn_transactional_disposition_set_id(disp, py2bytes(id)) + + +def pn_declared_disposition_get_id(disp): + return bytes2pybytes(lib.pn_declared_disposition_get_id(disp)) + + +def pn_declared_disposition_set_id(disp, id): + return lib.pn_declared_disposition_set_id(disp, py2bytes(id)) diff --git a/python/examples/broker.py b/python/examples/broker.py index 9d2cb38f7..f439dfe49 100755 --- a/python/examples/broker.py +++ b/python/examples/broker.py @@ -27,7 +27,7 @@ import uuid from typing import Optional, Union from proton import (Condition, Delivery, Described, Disposition, DispositionType, - Endpoint, Link, Sender, Message, Terminus, TransactionalDisposition) + Endpoint, Link, Sender, Message, Terminus, DeclaredDisposition) from proton.handlers import MessagingHandler from proton.reactor import Container @@ -231,13 +231,13 @@ class Broker(MessagingHandler): if isinstance(body, Described): link = delivery.link d = body.descriptor - if d == "amqp:declare:list" or d == 0x31: + if d == "amqp:declare:list": # Allocate transaction id tid = self._declare_txn() self._verbose_print(f"{tid=}: Declare") - delivery.local = TransactionalDisposition(tid) + delivery.local = DeclaredDisposition(tid) link._txns.add(tid) - elif d == "amqp:discharge:list" or d == 0x32: + elif d == "amqp:discharge:list": # Always accept commit/abort! value = body.value tid = bytes(value[0]) diff --git a/python/proton/__init__.py b/python/proton/__init__.py index 67c4a66f2..b765028b9 100644 --- a/python/proton/__init__.py +++ b/python/proton/__init__.py @@ -37,7 +37,7 @@ from ._condition import Condition from ._data import UNDESCRIBED, Array, Data, Described, char, symbol, timestamp, ubyte, ushort, uint, ulong, \ byte, short, int32, float32, decimal32, decimal64, decimal128, AnnotationDict, PropertyDict, SymbolList from ._delivery import Delivery, Disposition, DispositionType, CustomDisposition, RejectedDisposition, \ - ModifiedDisposition, ReceivedDisposition, TransactionalDisposition + ModifiedDisposition, ReceivedDisposition, DeclaredDisposition, TransactionalDisposition from ._endpoints import Endpoint, Connection, Session, Link, Receiver, Sender, Terminus from ._events import Collector, Event, EventType from ._exceptions import ProtonException, MessageException, DataException, TransportException, \ @@ -58,6 +58,7 @@ __all__ = [ "CustomDisposition", "Data", "DataException", + "DeclaredDisposition", "Delivery", "Disposition", "DispositionType", diff --git a/python/proton/_delivery.py b/python/proton/_delivery.py index 91e4e28db..0c0fc3e47 100644 --- a/python/proton/_delivery.py +++ b/python/proton/_delivery.py @@ -19,7 +19,8 @@ from __future__ import annotations -from cproton import (PN_ACCEPTED, PN_MODIFIED, PN_RECEIVED, PN_REJECTED, PN_RELEASED, PN_TRANSACTIONAL_STATE, +from cproton import (PN_ACCEPTED, PN_MODIFIED, PN_RECEIVED, PN_REJECTED, PN_RELEASED, PN_DECLARED, + PN_TRANSACTIONAL_STATE, pn_delivery_abort, pn_delivery_aborted, pn_delivery_attachments, pn_delivery_link, pn_delivery_local, pn_delivery_local_state, pn_delivery_partial, pn_delivery_pending, pn_delivery_readable, pn_delivery_remote, @@ -46,6 +47,9 @@ from cproton import (PN_ACCEPTED, PN_MODIFIED, PN_RECEIVED, PN_REJECTED, PN_RELE pn_modified_disposition_is_undeliverable, pn_modified_disposition_set_undeliverable, pn_modified_disposition_annotations, + pn_declared_disposition, + pn_declared_disposition_get_id, + pn_declared_disposition_set_id, pn_transactional_disposition, pn_transactional_disposition_get_id, pn_transactional_disposition_set_id, @@ -106,6 +110,12 @@ class DispositionType(IntEnum): delivery being settled. """ + TRANSACTION_DECLARED = PN_DECLARED + """ + A terminal state indicating that a transaction has been + declared and contsaining the transaction id. + """ + TRANSACTIONAL_STATE = PN_TRANSACTIONAL_STATE """ A non-terminal delivery state indicating the transactional @@ -133,6 +143,7 @@ class Disposition: REJECTED = DispositionType.REJECTED RELEASED = DispositionType.RELEASED MODIFIED = DispositionType.MODIFIED + DECLARED = DispositionType.TRANSACTION_DECLARED TRANSACTIONAL_STATE = DispositionType.TRANSACTIONAL_STATE @property @@ -151,6 +162,8 @@ class RemoteDisposition(Disposition): return super().__new__(RemoteRejectedDisposition) elif state == cls.MODIFIED: return super().__new__(RemoteModifiedDisposition) + elif state == cls.DECLARED: + return super().__new__(RemoteDeclaredDisposition) elif state == cls.TRANSACTIONAL_STATE: return super().__new__(RemoteTransactionalDisposition) else: @@ -257,6 +270,24 @@ class RemoteModifiedDisposition(RemoteDisposition): ModifiedDisposition(self._failed, self._undeliverable, self._annotations).apply_to(local_disposition) +class RemoteDeclaredDisposition(RemoteDisposition): + + def __init__(self, delivery_impl): + impl = pn_declared_disposition(pn_delivery_remote(delivery_impl)) + self._id = pn_declared_disposition_get_id(impl) + + @property + def type(self) -> Union[int, DispositionType]: + return Disposition.DECLARED + + @property + def id(self): + return self._id + + def apply_to(self, local_disposition: LocalDisposition): + DeclaredDisposition(self._id).apply_to(local_disposition) + + class RemoteTransactionalDisposition(RemoteDisposition): def __init__(self, delivery_impl): @@ -473,6 +504,28 @@ class ModifiedDisposition(LocalDisposition): obj2dat(self._annotations, pn_modified_disposition_annotations(disp)) +class DeclaredDisposition(LocalDisposition): + + def __init__(self, id): + self._id = id + + @property + def type(self) -> Union[int, DispositionType]: + return Disposition.DECLARED + + @property + def id(self): + return self._id + + @id.setter + def id(self, id): + self._id = id + + def apply_to(self, local_disposition: LocalDisposition): + disp = pn_declared_disposition(local_disposition._impl) + pn_declared_disposition_set_id(disp, self._id) + + class TransactionalDisposition(LocalDisposition): def __init__(self, id, outcome_type=None): diff --git a/python/proton/_reactor.py b/python/proton/_reactor.py index bff42508b..8f44d54dc 100644 --- a/python/proton/_reactor.py +++ b/python/proton/_reactor.py @@ -617,7 +617,7 @@ class Transaction: def handle_outcome(self, event): if event.delivery == self._declare: - if event.delivery.remote_state == Disposition.TRANSACTIONAL_STATE: + if event.delivery.remote_state == Disposition.DECLARED: self.id = event.delivery.remote.id self.handler.on_transaction_declared(event) elif event.delivery.remote_state == Delivery.REJECTED: diff --git a/python/tests/proton_tests/engine.py b/python/tests/proton_tests/engine.py index 6272af060..b2304c5ec 100644 --- a/python/tests/proton_tests/engine.py +++ b/python/tests/proton_tests/engine.py @@ -22,9 +22,10 @@ import gc from time import time, sleep from typing import Union -from proton import Array, Condition, Collector, Connection, Data, Delivery, Disposition, DispositionType, Endpoint, \ - Event, CustomDisposition, Link, ModifiedDisposition, PropertyDict, ReceivedDisposition, RejectedDisposition, \ - SASL, SessionException, SymbolList, Terminus, TransactionalDisposition, Transport, UNDESCRIBED, symbol +from proton import Array, Condition, Collector, Connection, Data, DeclaredDisposition, Delivery, Disposition, \ + DispositionType, Endpoint, Event, CustomDisposition, Link, ModifiedDisposition, PropertyDict, ReceivedDisposition, \ + RejectedDisposition, SASL, SessionException, SymbolList, Terminus, TransactionalDisposition, Transport, \ + UNDESCRIBED, symbol from proton.reactor import Container from . import common @@ -2551,6 +2552,21 @@ class NewCustomTester(DispositionTester): assert dlv.remote.data == self._data, (dlv.remote.data, self._data) +class DeclaredTester(DispositionTester): + def __init__(self, id): + self._id = id + super().__init__(Disposition.DECLARED) + + def apply(self, dlv: Delivery): + dlv.local = DeclaredDisposition(self._id) + dlv.update() + + def check(self, dlv: Delivery): + assert dlv.remote_state == self._type + assert dlv.remote.type == self._type + assert dlv.remote.id == self._id + + class TransactionalTester(DispositionTester): def __init__(self, id, outcome_type): self._id = id @@ -2649,6 +2665,9 @@ class DeliveryTest(Test): def testNewDefaultModified(self): self._testDisposition(NewDefaultModifiedTester()) + def testDeclared(self): + self._testDisposition(DeclaredTester(id=b'1324xxx')) + def testTransactional(self): self._testDisposition(TransactionalTester(id=b'1324xxx', outcome_type=Disposition.ACCEPTED)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org