This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c4b658a Showcasing DelayedStateMonitor. Adding descriptions from the
documentation to the exceptions coming from the Pulsar Proxy/Broker. Let the
exception handler pipeline handle the retry delay
c4b658a is described below
commit c4b658ad164954550131602791435ad9ed99fb3b
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Thu Jan 26 17:51:36 2023 +0100
Showcasing DelayedStateMonitor.
Adding descriptions from the documentation to the exceptions coming from
the Pulsar Proxy/Broker.
Let the exception handler pipeline handle the retry delay
---
samples/Processing/LoggerExtensions.cs | 52 +++++++++++++++-------
samples/Processing/Worker.cs | 13 ++++--
.../Exceptions/AuthenticationException.cs | 5 ++-
src/DotPulsar/Exceptions/AuthorizationException.cs | 5 ++-
src/DotPulsar/Exceptions/ChecksumException.cs | 5 ++-
.../Exceptions/ConsumerAssignException.cs | 5 ++-
src/DotPulsar/Exceptions/ConsumerBusyException.cs | 5 ++-
.../Exceptions/ConsumerNotFoundException.cs | 5 ++-
.../Exceptions/IncompatibleSchemaException.cs | 5 ++-
.../Exceptions/InvalidTopicNameException.cs | 5 ++-
.../InvalidTransactionStatusException.cs | 5 ++-
src/DotPulsar/Exceptions/MetadataException.cs | 5 ++-
src/DotPulsar/Exceptions/NotAllowedException.cs | 5 ++-
src/DotPulsar/Exceptions/PersistenceException.cs | 5 ++-
.../ProducerBlockedQuotaExceededException.cs | 5 ++-
src/DotPulsar/Exceptions/ProducerBusyException.cs | 5 ++-
.../Exceptions/ServiceNotReadyException.cs | 5 ++-
.../Exceptions/SubscriptionNotFoundException.cs | 5 ++-
.../Exceptions/TooManyRequestsException.cs | 5 ++-
src/DotPulsar/Exceptions/TopicNotFoundException.cs | 5 ++-
.../Exceptions/TopicTerminatedException.cs | 5 ++-
.../Exceptions/TransactionConflictException.cs | 5 ++-
.../TransactionCoordinatorNotFoundException.cs | 5 ++-
.../Exceptions/UnsupportedVersionException.cs | 5 ++-
src/DotPulsar/Internal/DefaultExceptionHandler.cs | 18 +++-----
src/DotPulsar/Internal/ExceptionHandlerPipeline.cs | 14 ++++--
src/DotPulsar/Internal/PulsarClientBuilder.cs | 4 +-
src/DotPulsar/Internal/PulsarStream.cs | 30 ++++++-------
28 files changed, 169 insertions(+), 72 deletions(-)
diff --git a/samples/Processing/LoggerExtensions.cs
b/samples/Processing/LoggerExtensions.cs
index 9c46547..e728b12 100644
--- a/samples/Processing/LoggerExtensions.cs
+++ b/samples/Processing/LoggerExtensions.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -23,7 +23,28 @@ using DotPulsar.Exceptions;
public static partial class LoggerExtensions
{
- // ConsumerChangedState
+ // Output Message
+ public static void OutputMessage(this ILogger logger, IMessage<string>
message)
+ {
+ var publishedOn = message.PublishTimeAsDateTime;
+ var payload = message.Value();
+ logger.OutputMessage(publishedOn, payload);
+ }
+
+ [LoggerMessage(EventId = 0, Level = LogLevel.Information, Message =
"Received: '{payload}' published on {publishedOn}")]
+ static partial void OutputMessage(this ILogger logger, DateTime
publishedOn, string payload);
+
+ // The Pulsar Client got an exception
+ public static void PulsarClientException(this ILogger logger,
ExceptionContext exceptionContext)
+ {
+ if (exceptionContext.Exception is not ChannelNotReadyException &&
exceptionContext.Exception is not TaskCanceledException)
+ logger.PulsarClientException(exceptionContext.Exception);
+ }
+
+ [LoggerMessage(EventId = 1, Level = LogLevel.Warning, Message = "The
PulsarClient got an exception")]
+ static partial void PulsarClientException(this ILogger logger, Exception
exception);
+
+ // Consumer changed state
public static void ConsumerChangedState(this ILogger logger,
ConsumerStateChanged stateChanged)
{
var logLevel = stateChanged.ConsumerState switch
@@ -36,29 +57,28 @@ public static partial class LoggerExtensions
logger.ConsumerChangedState(logLevel, stateChanged.Consumer.Topic,
stateChanged.ConsumerState.ToString());
}
- [LoggerMessage(EventId = 0, Message = "The consumer for topic '{topic}'
changed state to '{state}'")]
+ [LoggerMessage(EventId = 2, Message = "The consumer for topic '{topic}'
changed state to '{state}'")]
static partial void ConsumerChangedState(this ILogger logger, LogLevel
logLevel, string topic, string state);
- // OutputMessage
- public static void OutputMessage(this ILogger logger, IMessage<string>
message)
+ // Consumer lost connection
+ public static ValueTask<string> ConsumerLostConnection(this ILogger
logger, IConsumer consumer, ConsumerState state, CancellationToken
cancellationToken)
{
- var publishedOn = message.PublishTimeAsDateTime;
- var payload = message.Value();
- logger.OutputMessage(publishedOn, payload);
+ logger.ConsumerLostConnection(consumer.Topic);
+ return ValueTask.FromResult("TicketId"); // Simulating a ticket-id
after sending an alert. This is optional.
}
- [LoggerMessage(EventId = 1, Level = LogLevel.Information, Message =
"Received: '{payload}' published on {publishedOn}")]
- static partial void OutputMessage(this ILogger logger, DateTime
publishedOn, string payload);
+ [LoggerMessage(EventId = 3, Level = LogLevel.Warning, Message = "The
consumer for topic '{topic}' has lost the connection")]
+ static partial void ConsumerLostConnection(this ILogger logger, string
topic);
- // PulsarClientException
- public static void PulsarClientException(this ILogger logger,
ExceptionContext exceptionContext)
+ // Consumer regained connection
+ public static ValueTask ConsumerRegainedConnection(this ILogger logger,
IConsumer consumer, ConsumerState state, string ticketId, CancellationToken
cancellationToken)
{
- if (exceptionContext.Exception is not ChannelNotReadyException)
- logger.PulsarClientException(exceptionContext.Exception);
+ logger.ConsumerRegainedConnection(consumer.Topic);
+ return ValueTask.CompletedTask; // If an alert has been opened, this
is where we can close it again
}
- [LoggerMessage(EventId = 2, Level = LogLevel.Warning, Message = "The
PulsarClient got an exception")]
- static partial void PulsarClientException(this ILogger logger, Exception
exception);
+ [LoggerMessage(EventId = 4, Level = LogLevel.Warning, Message = "The
consumer for topic '{topic}' has regained the connection")]
+ static partial void ConsumerRegainedConnection(this ILogger logger, string
topic);
}
#pragma warning restore IDE0060 // Remove unused parameter
diff --git a/samples/Processing/Worker.cs b/samples/Processing/Worker.cs
index caa9d54..e471b29 100644
--- a/samples/Processing/Worker.cs
+++ b/samples/Processing/Worker.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -27,15 +27,22 @@ public class Worker : BackgroundService
protected override async Task ExecuteAsync(CancellationToken
cancellationToken)
{
await using var client = PulsarClient.Builder()
- .ExceptionHandler(context =>
_logger.PulsarClientException(context))
+ .ExceptionHandler(_logger.PulsarClientException) // Optional
.Build(); // Connecting to pulsar://localhost:6650
await using var consumer = client.NewConsumer(Schema.String)
- .StateChangedHandler(consumerStateChanged =>
_logger.ConsumerChangedState(consumerStateChanged))
+ .StateChangedHandler(_logger.ConsumerChangedState) // Optional
.SubscriptionName("MySubscription")
.Topic("persistent://public/default/mytopic")
.Create();
+ _ = consumer.DelayedStateMonitor( // Recommended way of ignoring the
short disconnects expected when working with a distributed system
+ ConsumerState.Active, // Operational state
+ TimeSpan.FromSeconds(5), // The amount of time allowed in
non-operational state before we act
+ _logger.ConsumerLostConnection, // Invoked if we are NOT back in
operational state after 5 seconds
+ _logger.ConsumerRegainedConnection, // Invoked when we are in
operational state again
+ cancellationToken);
+
await consumer.Process(ProcessMessage, cancellationToken);
}
diff --git a/src/DotPulsar/Exceptions/AuthenticationException.cs
b/src/DotPulsar/Exceptions/AuthenticationException.cs
index ae60915..64f05ad 100644
--- a/src/DotPulsar/Exceptions/AuthenticationException.cs
+++ b/src/DotPulsar/Exceptions/AuthenticationException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -16,6 +16,9 @@ namespace DotPulsar.Exceptions;
using System;
+/// <summary>
+/// Non valid authentication
+/// </summary>
public sealed class AuthenticationException : DotPulsarException
{
public AuthenticationException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/AuthorizationException.cs
b/src/DotPulsar/Exceptions/AuthorizationException.cs
index 211e4ee..af5140f 100644
--- a/src/DotPulsar/Exceptions/AuthorizationException.cs
+++ b/src/DotPulsar/Exceptions/AuthorizationException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// Not authorized to use resource
+/// </summary>
public sealed class AuthorizationException : DotPulsarException
{
public AuthorizationException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ChecksumException.cs
b/src/DotPulsar/Exceptions/ChecksumException.cs
index 5fd1690..d7dbd69 100644
--- a/src/DotPulsar/Exceptions/ChecksumException.cs
+++ b/src/DotPulsar/Exceptions/ChecksumException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// Error while verifying message checksum
+/// </summary>
public sealed class ChecksumException : DotPulsarException
{
public ChecksumException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ConsumerAssignException.cs
b/src/DotPulsar/Exceptions/ConsumerAssignException.cs
index c8cf2fa..9402809 100644
--- a/src/DotPulsar/Exceptions/ConsumerAssignException.cs
+++ b/src/DotPulsar/Exceptions/ConsumerAssignException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// Dispatcher assign consumer error
+/// </summary>
public sealed class ConsumerAssignException : DotPulsarException
{
public ConsumerAssignException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ConsumerBusyException.cs
b/src/DotPulsar/Exceptions/ConsumerBusyException.cs
index 9a858e7..48da45b 100644
--- a/src/DotPulsar/Exceptions/ConsumerBusyException.cs
+++ b/src/DotPulsar/Exceptions/ConsumerBusyException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// Unable to subscribe/unsubscribe because other consumers are connected
+/// </summary>
public sealed class ConsumerBusyException : DotPulsarException
{
public ConsumerBusyException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
b/src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
index f66432b..8705e79 100644
--- a/src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
+++ b/src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// Consumer not found
+/// </summary>
public sealed class ConsumerNotFoundException : DotPulsarException
{
public ConsumerNotFoundException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/IncompatibleSchemaException.cs
b/src/DotPulsar/Exceptions/IncompatibleSchemaException.cs
index 2ed83ae..33c9df9 100644
--- a/src/DotPulsar/Exceptions/IncompatibleSchemaException.cs
+++ b/src/DotPulsar/Exceptions/IncompatibleSchemaException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// Specified schema was incompatible with topic schema
+/// </summary>
public sealed class IncompatibleSchemaException : DotPulsarException
{
public IncompatibleSchemaException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/InvalidTopicNameException.cs
b/src/DotPulsar/Exceptions/InvalidTopicNameException.cs
index ff84f8b..5280f07 100644
--- a/src/DotPulsar/Exceptions/InvalidTopicNameException.cs
+++ b/src/DotPulsar/Exceptions/InvalidTopicNameException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// The topic name is not valid
+/// </summary>
public sealed class InvalidTopicNameException : DotPulsarException
{
public InvalidTopicNameException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/InvalidTransactionStatusException.cs
b/src/DotPulsar/Exceptions/InvalidTransactionStatusException.cs
index caba91a..8d3a549 100644
--- a/src/DotPulsar/Exceptions/InvalidTransactionStatusException.cs
+++ b/src/DotPulsar/Exceptions/InvalidTransactionStatusException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -16,6 +16,9 @@ namespace DotPulsar.Exceptions;
using System;
+/// <summary>
+/// Invalid transaction status error
+/// </summary>
public sealed class InvalidTransactionStatusException : DotPulsarException
{
public InvalidTransactionStatusException(string message) : base(message) {
}
diff --git a/src/DotPulsar/Exceptions/MetadataException.cs
b/src/DotPulsar/Exceptions/MetadataException.cs
index d8906cd..9002ab4 100644
--- a/src/DotPulsar/Exceptions/MetadataException.cs
+++ b/src/DotPulsar/Exceptions/MetadataException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// Error with ZK/metadata
+/// </summary>
public sealed class MetadataException : DotPulsarException
{
public MetadataException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/NotAllowedException.cs
b/src/DotPulsar/Exceptions/NotAllowedException.cs
index 8d76b87..7c67048 100644
--- a/src/DotPulsar/Exceptions/NotAllowedException.cs
+++ b/src/DotPulsar/Exceptions/NotAllowedException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -16,6 +16,9 @@ namespace DotPulsar.Exceptions;
using System;
+/// <summary>
+/// Not allowed error
+/// </summary>
public sealed class NotAllowedException : DotPulsarException
{
public NotAllowedException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/PersistenceException.cs
b/src/DotPulsar/Exceptions/PersistenceException.cs
index 2058aa2..11c9ab6 100644
--- a/src/DotPulsar/Exceptions/PersistenceException.cs
+++ b/src/DotPulsar/Exceptions/PersistenceException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// Error writing reading from BK
+/// </summary>
public sealed class PersistenceException : DotPulsarException
{
public PersistenceException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ProducerBlockedQuotaExceededException.cs
b/src/DotPulsar/Exceptions/ProducerBlockedQuotaExceededException.cs
index d644a49..237fc56 100644
--- a/src/DotPulsar/Exceptions/ProducerBlockedQuotaExceededException.cs
+++ b/src/DotPulsar/Exceptions/ProducerBlockedQuotaExceededException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// Unable to create producer because backlog quota exceeded
+/// </summary>
public sealed class ProducerBlockedQuotaExceededException : DotPulsarException
{
public ProducerBlockedQuotaExceededException(string message) :
base(message) { }
diff --git a/src/DotPulsar/Exceptions/ProducerBusyException.cs
b/src/DotPulsar/Exceptions/ProducerBusyException.cs
index 5e573af..71fb1a5 100644
--- a/src/DotPulsar/Exceptions/ProducerBusyException.cs
+++ b/src/DotPulsar/Exceptions/ProducerBusyException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// Producer with same name is already connected
+/// </summary>
public sealed class ProducerBusyException : DotPulsarException
{
public ProducerBusyException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ServiceNotReadyException.cs
b/src/DotPulsar/Exceptions/ServiceNotReadyException.cs
index 053aed7..a2e2ee5 100644
--- a/src/DotPulsar/Exceptions/ServiceNotReadyException.cs
+++ b/src/DotPulsar/Exceptions/ServiceNotReadyException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// Any error that requires client retry operation with a fresh lookup
+/// </summary>
public sealed class ServiceNotReadyException : DotPulsarException
{
public ServiceNotReadyException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/SubscriptionNotFoundException.cs
b/src/DotPulsar/Exceptions/SubscriptionNotFoundException.cs
index 64a7415..19684d6 100644
--- a/src/DotPulsar/Exceptions/SubscriptionNotFoundException.cs
+++ b/src/DotPulsar/Exceptions/SubscriptionNotFoundException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// Subscription not found
+/// </summary>
public sealed class SubscriptionNotFoundException : DotPulsarException
{
public SubscriptionNotFoundException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/TooManyRequestsException.cs
b/src/DotPulsar/Exceptions/TooManyRequestsException.cs
index 0d67632..ebaae3b 100644
--- a/src/DotPulsar/Exceptions/TooManyRequestsException.cs
+++ b/src/DotPulsar/Exceptions/TooManyRequestsException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// Error with too many simultaneously request
+/// </summary>
public sealed class TooManyRequestsException : DotPulsarException
{
public TooManyRequestsException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/TopicNotFoundException.cs
b/src/DotPulsar/Exceptions/TopicNotFoundException.cs
index c2585c5..0c9881a 100644
--- a/src/DotPulsar/Exceptions/TopicNotFoundException.cs
+++ b/src/DotPulsar/Exceptions/TopicNotFoundException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// Topic not found
+/// </summary>
public sealed class TopicNotFoundException : DotPulsarException
{
public TopicNotFoundException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/TopicTerminatedException.cs
b/src/DotPulsar/Exceptions/TopicTerminatedException.cs
index 076f0d3..8002753 100644
--- a/src/DotPulsar/Exceptions/TopicTerminatedException.cs
+++ b/src/DotPulsar/Exceptions/TopicTerminatedException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// The topic has been terminated
+/// </summary>
public sealed class TopicTerminatedException : DotPulsarException
{
public TopicTerminatedException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/TransactionConflictException.cs
b/src/DotPulsar/Exceptions/TransactionConflictException.cs
index 1a83c10..a138f0f 100644
--- a/src/DotPulsar/Exceptions/TransactionConflictException.cs
+++ b/src/DotPulsar/Exceptions/TransactionConflictException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -16,6 +16,9 @@ namespace DotPulsar.Exceptions;
using System;
+/// <summary>
+/// Ack with transaction conflict
+/// </summary>
public sealed class TransactionConflictException : DotPulsarException
{
public TransactionConflictException(string message) : base(message) { }
diff --git
a/src/DotPulsar/Exceptions/TransactionCoordinatorNotFoundException.cs
b/src/DotPulsar/Exceptions/TransactionCoordinatorNotFoundException.cs
index c79117a..8f9c0f2 100644
--- a/src/DotPulsar/Exceptions/TransactionCoordinatorNotFoundException.cs
+++ b/src/DotPulsar/Exceptions/TransactionCoordinatorNotFoundException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -16,6 +16,9 @@ namespace DotPulsar.Exceptions;
using System;
+/// <summary>
+/// Transaction coordinator not found error
+/// </summary>
public sealed class TransactionCoordinatorNotFoundException :
DotPulsarException
{
public TransactionCoordinatorNotFoundException(string message) :
base(message) { }
diff --git a/src/DotPulsar/Exceptions/UnsupportedVersionException.cs
b/src/DotPulsar/Exceptions/UnsupportedVersionException.cs
index 28594a4..4c45054 100644
--- a/src/DotPulsar/Exceptions/UnsupportedVersionException.cs
+++ b/src/DotPulsar/Exceptions/UnsupportedVersionException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,9 @@
namespace DotPulsar.Exceptions;
+/// <summary>
+/// Error when an older client/version doesn't support a required feature
+/// </summary>
public sealed class UnsupportedVersionException : DotPulsarException
{
public UnsupportedVersionException(string message) : base(message) { }
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index 49c8027..0c6d59e 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -25,19 +25,15 @@ using System.Threading.Tasks;
public sealed class DefaultExceptionHandler : IHandleException
{
- private readonly TimeSpan _retryInterval;
-
- public DefaultExceptionHandler(TimeSpan retryInterval)
- => _retryInterval = retryInterval;
-
- public async ValueTask OnException(ExceptionContext exceptionContext)
+ public ValueTask OnException(ExceptionContext exceptionContext)
{
exceptionContext.Result =
DetermineFaultAction(exceptionContext.Exception,
exceptionContext.CancellationToken);
-
- if (exceptionContext.Result == FaultAction.Retry)
- await Task.Delay(_retryInterval,
exceptionContext.CancellationToken).ConfigureAwait(false);
-
exceptionContext.ExceptionHandled = true;
+#if NET6_0_OR_GREATER
+ return ValueTask.CompletedTask;
+#else
+ return new ValueTask();
+#endif
}
private static FaultAction DetermineFaultAction(Exception exception,
CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
b/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
index 00d943e..f6e0b7d 100644
--- a/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
+++ b/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
@@ -22,9 +23,13 @@ using System.Threading.Tasks;
public sealed class ExceptionHandlerPipeline : IHandleException
{
private readonly IHandleException[] _handlers;
+ private readonly TimeSpan _retryInterval;
- public ExceptionHandlerPipeline(IEnumerable<IHandleException> handlers)
- => _handlers = handlers.ToArray();
+ public ExceptionHandlerPipeline(TimeSpan retryInterval,
IEnumerable<IHandleException> handlers)
+ {
+ _retryInterval = retryInterval;
+ _handlers = handlers.ToArray();
+ }
public async ValueTask OnException(ExceptionContext exceptionContext)
{
@@ -35,5 +40,8 @@ public sealed class ExceptionHandlerPipeline :
IHandleException
if (exceptionContext.ExceptionHandled)
break;
}
+
+ if (exceptionContext.Result == FaultAction.Retry)
+ await Task.Delay(_retryInterval,
exceptionContext.CancellationToken).ConfigureAwait(false);
}
}
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs
b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 373667f..0aaee42 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -171,8 +171,8 @@ public sealed class PulsarClientBuilder :
IPulsarClientBuilder
var connector = new Connector(_clientCertificates,
_trustedCertificateAuthority, _verifyCertificateAuthority,
_verifyCertificateName, _checkCertificateRevocation);
- var exceptionHandlers = new List<IHandleException>(_exceptionHandlers)
{ new DefaultExceptionHandler(_retryInterval) };
- var exceptionHandlerPipeline = new
ExceptionHandlerPipeline(exceptionHandlers);
+ var exceptionHandlers = new List<IHandleException>(_exceptionHandlers)
{ new DefaultExceptionHandler() };
+ var exceptionHandlerPipeline = new
ExceptionHandlerPipeline(_retryInterval, exceptionHandlers);
var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl,
connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval,
_listenerName, _keepAliveInterval, _authentication);
var processManager = new ProcessManager(connectionPool);
diff --git a/src/DotPulsar/Internal/PulsarStream.cs
b/src/DotPulsar/Internal/PulsarStream.cs
index af41066..13488bc 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -28,11 +28,11 @@ using System.Threading.Tasks;
public sealed class PulsarStream : IPulsarStream
{
- private const int _frameSizePrefix = 4;
- private const int _unknownFrameSize = 0;
- private const long _pauseAtMoreThan10Mb = 10485760;
- private const long _resumeAt5MbOrLess = 5242881;
- private const int _chunkSize = 75000;
+ private const int FrameSizePrefix = 4;
+ private const int UnknownFrameSize = 0;
+ private const long PauseAtMoreThan10Mb = 10485760;
+ private const long ResumeAt5MbOrLess = 5242881;
+ private const int ChunkSize = 75000;
private readonly Stream _stream;
private readonly ChunkingPipeline _pipeline;
@@ -43,8 +43,8 @@ public sealed class PulsarStream : IPulsarStream
public PulsarStream(Stream stream)
{
_stream = stream;
- _pipeline = new ChunkingPipeline(stream, _chunkSize);
- var options = new PipeOptions(pauseWriterThreshold:
_pauseAtMoreThan10Mb, resumeWriterThreshold: _resumeAt5MbOrLess);
+ _pipeline = new ChunkingPipeline(stream, ChunkSize);
+ var options = new PipeOptions(pauseWriterThreshold:
PauseAtMoreThan10Mb, resumeWriterThreshold: ResumeAt5MbOrLess);
var pipe = new Pipe(options);
_reader = pipe.Reader;
_writer = pipe.Writer;
@@ -119,33 +119,33 @@ public sealed class PulsarStream : IPulsarStream
try
{
- var frameSize = _unknownFrameSize;
+ var frameSize = UnknownFrameSize;
var totalSize = 0;
while (true)
{
- var minimumSize = _frameSizePrefix + frameSize;
+ var minimumSize = FrameSizePrefix + frameSize;
var readResult = await _reader.ReadAtLeastAsync(minimumSize,
cancellationToken).ConfigureAwait(false);
var buffer = readResult.Buffer;
while (true)
{
- if (buffer.Length < _frameSizePrefix)
+ if (buffer.Length < FrameSizePrefix)
break;
- if (frameSize == _unknownFrameSize)
+ if (frameSize == UnknownFrameSize)
{
frameSize = (int) buffer.ReadUInt32(0, true);
- totalSize = _frameSizePrefix + frameSize;
+ totalSize = FrameSizePrefix + frameSize;
}
if (buffer.Length < totalSize)
break;
- yield return buffer.Slice(_frameSizePrefix, frameSize);
+ yield return buffer.Slice(FrameSizePrefix, frameSize);
buffer = buffer.Slice(totalSize);
- frameSize = _unknownFrameSize;
+ frameSize = UnknownFrameSize;
}
if (readResult.IsCompleted)