This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cebfdf6  Adding check for message size when sending with the producer
cebfdf6 is described below

commit cebfdf6aeb24e84e977c298b0188dcbef0022496
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Jan 12 12:34:52 2024 +0100

    Adding check for message size when sending with the producer
---
 CHANGELOG.md                                       |  7 +++++
 .../Exceptions/TooLargeMessageException.cs         | 31 ++++++++++++++++++++++
 src/DotPulsar/Internal/Connection.cs               | 23 +++++++++-------
 3 files changed, 52 insertions(+), 9 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4ec01df..a6e514d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,13 @@ All notable changes to this project will be documented in this 
file.
 
 The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to 
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [Unreleased]
+
+### Fixed
+
+- When sending a message that is too large the broker will close the tcp 
connection, causing the producer to disconnect, reconnect, and retry in an 
endless loop.
+  Now a TooLargeMessageException is thrown (will be given to the exception 
handler) and the producer's state is changed to 'Faulted'
+
 ## [3.1.1] - 2023-12-11
 
 ### Fixed
diff --git a/src/DotPulsar/Exceptions/TooLargeMessageException.cs 
b/src/DotPulsar/Exceptions/TooLargeMessageException.cs
new file mode 100644
index 0000000..7b42e30
--- /dev/null
+++ b/src/DotPulsar/Exceptions/TooLargeMessageException.cs
@@ -0,0 +1,31 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions;
+
+/// <summary>
+/// Exception thrown when trying to send a message that exceeds the max 
message size
+/// </summary>
+public sealed class TooLargeMessageException : DotPulsarException
+{
+    public TooLargeMessageException(int messageSize, int maxMessageSize)
+        : base($"The message size is {messageSize} bytes but the maximum 
message size allowed is {maxMessageSize} bytes")
+    {
+        MessageSize = messageSize;
+        MaxMessageSize = maxMessageSize;
+    }
+
+    public int MessageSize { get; }
+    public int MaxMessageSize { get; }
+}
diff --git a/src/DotPulsar/Internal/Connection.cs 
b/src/DotPulsar/Internal/Connection.cs
index b8ce044..1c7771a 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Internal;
 
 using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
 using DotPulsar.Extensions;
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Exceptions;
@@ -241,19 +242,23 @@ public sealed class Connection : IConnection
     {
         ThrowIfDisposed();
 
-        using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+        try
         {
-            try
+            var sequence = 
Serializer.Serialize(command.Command!.AsBaseCommand(), command.Metadata!, 
command.Payload);
+
+            if (sequence.Length > MaxMessageSize)
+                throw new TooLargeMessageException((int) sequence.Length, 
MaxMessageSize);
+
+            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
                 _channelManager.Outgoing(command.Command!, responseTcs);
+                await _stream.Send(sequence).ConfigureAwait(false);
             }
-            catch (OperationCanceledException)
-            {
-                responseTcs.TrySetCanceled();
-                throw;
-            }
-            var sequence = 
Serializer.Serialize(command.Command!.AsBaseCommand(), command.Metadata!, 
command.Payload);
-            await _stream.Send(sequence).ConfigureAwait(false);
+        }
+        catch (OperationCanceledException)
+        {
+            _ = responseTcs.TrySetCanceled();
+            throw;
         }
     }
 

Reply via email to