This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cebfdf6 Adding check for message size when sending with the producer
cebfdf6 is described below
commit cebfdf6aeb24e84e977c298b0188dcbef0022496
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Jan 12 12:34:52 2024 +0100
Adding check for message size when sending with the producer
---
CHANGELOG.md | 7 +++++
.../Exceptions/TooLargeMessageException.cs | 31 ++++++++++++++++++++++
src/DotPulsar/Internal/Connection.cs | 23 +++++++++-------
3 files changed, 52 insertions(+), 9 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4ec01df..a6e514d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,13 @@ All notable changes to this project will be documented in this
file.
The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [Unreleased]
+
+### Fixed
+
+- When sending a message that is too large the broker will close the tcp
connection, causing the producer to disconnect, reconnect, and retry in an
endless loop.
+ Now a TooLargeMessageException is thrown (will be given to the exception
handler) and the producer's state is changed to 'Faulted'
+
## [3.1.1] - 2023-12-11
### Fixed
diff --git a/src/DotPulsar/Exceptions/TooLargeMessageException.cs
b/src/DotPulsar/Exceptions/TooLargeMessageException.cs
new file mode 100644
index 0000000..7b42e30
--- /dev/null
+++ b/src/DotPulsar/Exceptions/TooLargeMessageException.cs
@@ -0,0 +1,31 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions;
+
+/// <summary>
+/// Exception thrown when trying to send a message that exceeds the max
message size
+/// </summary>
+public sealed class TooLargeMessageException : DotPulsarException
+{
+ public TooLargeMessageException(int messageSize, int maxMessageSize)
+ : base($"The message size is {messageSize} bytes but the maximum
message size allowed is {maxMessageSize} bytes")
+ {
+ MessageSize = messageSize;
+ MaxMessageSize = maxMessageSize;
+ }
+
+ public int MessageSize { get; }
+ public int MaxMessageSize { get; }
+}
diff --git a/src/DotPulsar/Internal/Connection.cs
b/src/DotPulsar/Internal/Connection.cs
index b8ce044..1c7771a 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
using DotPulsar.Extensions;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Exceptions;
@@ -241,19 +242,23 @@ public sealed class Connection : IConnection
{
ThrowIfDisposed();
- using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+ try
{
- try
+ var sequence =
Serializer.Serialize(command.Command!.AsBaseCommand(), command.Metadata!,
command.Payload);
+
+ if (sequence.Length > MaxMessageSize)
+ throw new TooLargeMessageException((int) sequence.Length,
MaxMessageSize);
+
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
_channelManager.Outgoing(command.Command!, responseTcs);
+ await _stream.Send(sequence).ConfigureAwait(false);
}
- catch (OperationCanceledException)
- {
- responseTcs.TrySetCanceled();
- throw;
- }
- var sequence =
Serializer.Serialize(command.Command!.AsBaseCommand(), command.Metadata!,
command.Payload);
- await _stream.Send(sequence).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ _ = responseTcs.TrySetCanceled();
+ throw;
}
}