This is an automated email from the ASF dual-hosted git repository. havret pushed a commit to branch feature_add_public_api_for_nack in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
commit c778e465d6ac8ba9aaf5d8abc0b0f4af00a37d37 Author: Havret <[email protected]> AuthorDate: Sun Mar 26 22:20:57 2023 +0200 AMQNET-831 Add option to change ack mode via message properties --- src/NMS.AMQP/Message/NmsMessage.cs | 6 +- .../Message/NmsMessagePropertiesInterceptor.cs | 77 ++++++++++++++++++++++ .../AmqpAcknowledgmentsIntegrationTest.cs | 8 +-- .../Apache-NMS-AMQP-Test/Message/NmsMessageTest.cs | 2 +- 4 files changed, 84 insertions(+), 9 deletions(-) diff --git a/src/NMS.AMQP/Message/NmsMessage.cs b/src/NMS.AMQP/Message/NmsMessage.cs index c56854c..c620599 100644 --- a/src/NMS.AMQP/Message/NmsMessage.cs +++ b/src/NMS.AMQP/Message/NmsMessage.cs @@ -25,7 +25,7 @@ namespace Apache.NMS.AMQP.Message { public class NmsMessage : IMessage { - private MessagePropertyIntercepter properties; + private NmsMessagePropertiesInterceptor properties; private bool readOnlyProperties; public NmsMessage(INmsMessageFacade facade) @@ -35,9 +35,7 @@ namespace Apache.NMS.AMQP.Message public INmsMessageFacade Facade { get; } - public IPrimitiveMap Properties => properties ?? - (properties = new MessagePropertyIntercepter(this, Facade.Properties, - IsReadOnlyProperties)); + public IPrimitiveMap Properties => properties ?? (properties = new NmsMessagePropertiesInterceptor(this, IsReadOnlyProperties)); public string NMSCorrelationID { diff --git a/src/NMS.AMQP/Message/NmsMessagePropertiesInterceptor.cs b/src/NMS.AMQP/Message/NmsMessagePropertiesInterceptor.cs new file mode 100644 index 0000000..28ea453 --- /dev/null +++ b/src/NMS.AMQP/Message/NmsMessagePropertiesInterceptor.cs @@ -0,0 +1,77 @@ +using System; +using System.ComponentModel; +using Apache.NMS.Util; + +namespace Apache.NMS.AMQP.Message +{ + internal class NmsMessagePropertiesInterceptor : PrimitiveMapInterceptor + { + private const string NMSX_GROUP_ID = "NMSXGroupId"; + private const string NMSX_GROUP_SEQ = "NMSXGroupSeq"; + private const string NMS_AMQP_ACK_TYPE = "NMS_AMQP_ACK_TYPE"; + + private readonly NmsMessage nmsMessage; + + public NmsMessagePropertiesInterceptor(NmsMessage nmsMessage, bool isReadOnlyProperties) : base(nmsMessage, nmsMessage.Facade.Properties, isReadOnlyProperties) + { + this.nmsMessage = nmsMessage; + } + + protected override object GetObjectProperty(string name) + { + CheckPropertyNameIsValid(name); + + switch (name) + { + case NMSX_GROUP_ID: + return this.nmsMessage.Facade.GroupId; + case NMSX_GROUP_SEQ: + return this.nmsMessage.Facade.GroupSequence; + case NMS_AMQP_ACK_TYPE: + return this.nmsMessage.NmsAcknowledgeCallback?.AcknowledgementType; + default: + return base.GetObjectProperty(name); + } + } + + protected override void SetObjectProperty(string name, object value) + { + CheckPropertyNameIsValid(name); + + switch (name) + { + case NMSX_GROUP_ID when value is string groupId: + this.nmsMessage.Facade.GroupId = groupId; + break; + case NMSX_GROUP_SEQ: + var groupSequence = Convert.ToUInt32(value); + this.nmsMessage.Facade.GroupSequence = groupSequence; + break; + case NMS_AMQP_ACK_TYPE: + if (this.nmsMessage.NmsAcknowledgeCallback == null) + { + throw new NMSException($"Session Acknowledgement Mode does not allow setting: {NMS_AMQP_ACK_TYPE}"); + } + var ackType = Convert.ToInt32(value); + this.nmsMessage.NmsAcknowledgeCallback.AcknowledgementType = (AckType) ackType; + break; + default: + base.SetObjectProperty(name, value); + break; + } + } + + private static void CheckPropertyNameIsValid(string name) + { + if (name == null) + { + throw new ArgumentNullException(paramName: nameof(name), message: "Property name must not be null"); + } + + if (name.Length == 0) + { + throw new ArgumentException(message: "Property name must not be the empty string", paramName: nameof(name)); + } + } + } +} \ No newline at end of file diff --git a/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs index 505e133..69d186f 100644 --- a/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs +++ b/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs @@ -324,8 +324,8 @@ namespace NMS.AMQP.Test.Integration uint deliveryNumber = (uint) message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1; testPeer.ExpectDisposition(settled: true, stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber, lastDeliveryId: deliveryNumber); - ((NmsMessage) message).NmsAcknowledgeCallback.AcknowledgementType = AckType.MODIFIED_FAILED_UNDELIVERABLE; - + message.Properties["NMS_AMQP_ACK_TYPE"] = AckType.MODIFIED_FAILED_UNDELIVERABLE; + message.Acknowledge(); testPeer.WaitForAllMatchersToComplete(3000); @@ -387,7 +387,7 @@ namespace NMS.AMQP.Test.Integration uint deliveryNumber = (uint) message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1; testPeer.ExpectDisposition(settled: true, stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber, lastDeliveryId: deliveryNumber); - ((NmsMessage) message).NmsAcknowledgeCallback.AcknowledgementType = AckType.REJECTED; + message.Properties["NMS_AMQP_ACK_TYPE"] = AckType.REJECTED; message.Acknowledge(); @@ -450,7 +450,7 @@ namespace NMS.AMQP.Test.Integration uint deliveryNumber = (uint) message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1; testPeer.ExpectDisposition(settled: true, stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber, lastDeliveryId: deliveryNumber); - ((NmsMessage) message).NmsAcknowledgeCallback.AcknowledgementType = AckType.MODIFIED_FAILED; + message.Properties["NMS_AMQP_ACK_TYPE"] = AckType.MODIFIED_FAILED; message.Acknowledge(); diff --git a/test/Apache-NMS-AMQP-Test/Message/NmsMessageTest.cs b/test/Apache-NMS-AMQP-Test/Message/NmsMessageTest.cs index 308bc53..ccb3dd0 100644 --- a/test/Apache-NMS-AMQP-Test/Message/NmsMessageTest.cs +++ b/test/Apache-NMS-AMQP-Test/Message/NmsMessageTest.cs @@ -349,7 +349,7 @@ namespace NMS.AMQP.Test.Message { NmsMessage msg = factory.CreateMessage(); - Assert.Throws<ArgumentNullException>(() => msg.Properties.SetString(null, "asd")); + Assert.Throws<ArgumentException>(() => msg.Properties.SetString("", "asd")); } [Test]
