This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c4b658a  Showcasing DelayedStateMonitor. Adding descriptions from the 
documentation to the exceptions coming from the Pulsar Proxy/Broker. Let the 
exception handler pipeline handle the retry delay
c4b658a is described below

commit c4b658ad164954550131602791435ad9ed99fb3b
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Thu Jan 26 17:51:36 2023 +0100

    Showcasing DelayedStateMonitor.
    Adding descriptions from the documentation to the exceptions coming from 
the Pulsar Proxy/Broker.
    Let the exception handler pipeline handle the retry delay
---
 samples/Processing/LoggerExtensions.cs             | 52 +++++++++++++++-------
 samples/Processing/Worker.cs                       | 13 ++++--
 .../Exceptions/AuthenticationException.cs          |  5 ++-
 src/DotPulsar/Exceptions/AuthorizationException.cs |  5 ++-
 src/DotPulsar/Exceptions/ChecksumException.cs      |  5 ++-
 .../Exceptions/ConsumerAssignException.cs          |  5 ++-
 src/DotPulsar/Exceptions/ConsumerBusyException.cs  |  5 ++-
 .../Exceptions/ConsumerNotFoundException.cs        |  5 ++-
 .../Exceptions/IncompatibleSchemaException.cs      |  5 ++-
 .../Exceptions/InvalidTopicNameException.cs        |  5 ++-
 .../InvalidTransactionStatusException.cs           |  5 ++-
 src/DotPulsar/Exceptions/MetadataException.cs      |  5 ++-
 src/DotPulsar/Exceptions/NotAllowedException.cs    |  5 ++-
 src/DotPulsar/Exceptions/PersistenceException.cs   |  5 ++-
 .../ProducerBlockedQuotaExceededException.cs       |  5 ++-
 src/DotPulsar/Exceptions/ProducerBusyException.cs  |  5 ++-
 .../Exceptions/ServiceNotReadyException.cs         |  5 ++-
 .../Exceptions/SubscriptionNotFoundException.cs    |  5 ++-
 .../Exceptions/TooManyRequestsException.cs         |  5 ++-
 src/DotPulsar/Exceptions/TopicNotFoundException.cs |  5 ++-
 .../Exceptions/TopicTerminatedException.cs         |  5 ++-
 .../Exceptions/TransactionConflictException.cs     |  5 ++-
 .../TransactionCoordinatorNotFoundException.cs     |  5 ++-
 .../Exceptions/UnsupportedVersionException.cs      |  5 ++-
 src/DotPulsar/Internal/DefaultExceptionHandler.cs  | 18 +++-----
 src/DotPulsar/Internal/ExceptionHandlerPipeline.cs | 14 ++++--
 src/DotPulsar/Internal/PulsarClientBuilder.cs      |  4 +-
 src/DotPulsar/Internal/PulsarStream.cs             | 30 ++++++-------
 28 files changed, 169 insertions(+), 72 deletions(-)

diff --git a/samples/Processing/LoggerExtensions.cs 
b/samples/Processing/LoggerExtensions.cs
index 9c46547..e728b12 100644
--- a/samples/Processing/LoggerExtensions.cs
+++ b/samples/Processing/LoggerExtensions.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -23,7 +23,28 @@ using DotPulsar.Exceptions;
 
 public static partial class LoggerExtensions
 {
-    // ConsumerChangedState
+    // Output Message
+    public static void OutputMessage(this ILogger logger, IMessage<string> 
message)
+    {
+        var publishedOn = message.PublishTimeAsDateTime;
+        var payload = message.Value();
+        logger.OutputMessage(publishedOn, payload);
+    }
+
+    [LoggerMessage(EventId = 0, Level = LogLevel.Information, Message = 
"Received: '{payload}' published on {publishedOn}")]
+    static partial void OutputMessage(this ILogger logger, DateTime 
publishedOn, string payload);
+
+    // The Pulsar Client got an exception
+    public static void PulsarClientException(this ILogger logger, 
ExceptionContext exceptionContext)
+    {
+        if (exceptionContext.Exception is not ChannelNotReadyException && 
exceptionContext.Exception is not TaskCanceledException)
+            logger.PulsarClientException(exceptionContext.Exception);
+    }
+
+    [LoggerMessage(EventId = 1, Level = LogLevel.Warning, Message = "The 
PulsarClient got an exception")]
+    static partial void PulsarClientException(this ILogger logger, Exception 
exception);
+
+    // Consumer changed state
     public static void ConsumerChangedState(this ILogger logger, 
ConsumerStateChanged stateChanged)
     {
         var logLevel = stateChanged.ConsumerState switch
@@ -36,29 +57,28 @@ public static partial class LoggerExtensions
         logger.ConsumerChangedState(logLevel, stateChanged.Consumer.Topic, 
stateChanged.ConsumerState.ToString());
     }
 
-    [LoggerMessage(EventId = 0, Message = "The consumer for topic '{topic}' 
changed state to '{state}'")]
+    [LoggerMessage(EventId = 2, Message = "The consumer for topic '{topic}' 
changed state to '{state}'")]
     static partial void ConsumerChangedState(this ILogger logger, LogLevel 
logLevel, string topic, string state);
 
-    // OutputMessage
-    public static void OutputMessage(this ILogger logger, IMessage<string> 
message)
+    // Consumer lost connection
+    public static ValueTask<string> ConsumerLostConnection(this ILogger 
logger, IConsumer consumer, ConsumerState state, CancellationToken 
cancellationToken)
     {
-        var publishedOn = message.PublishTimeAsDateTime;
-        var payload = message.Value();
-        logger.OutputMessage(publishedOn, payload);
+        logger.ConsumerLostConnection(consumer.Topic);
+        return ValueTask.FromResult("TicketId"); // Simulating a ticket-id 
after sending an alert. This is optional.
     }
 
-    [LoggerMessage(EventId = 1, Level = LogLevel.Information, Message = 
"Received: '{payload}' published on {publishedOn}")]
-    static partial void OutputMessage(this ILogger logger, DateTime 
publishedOn, string payload);
+    [LoggerMessage(EventId = 3, Level = LogLevel.Warning, Message = "The 
consumer for topic '{topic}' has lost the connection")]
+    static partial void ConsumerLostConnection(this ILogger logger, string 
topic);
 
-    // PulsarClientException
-    public static void PulsarClientException(this ILogger logger, 
ExceptionContext exceptionContext)
+    // Consumer regained connection
+    public static ValueTask ConsumerRegainedConnection(this ILogger logger, 
IConsumer consumer, ConsumerState state, string ticketId, CancellationToken 
cancellationToken)
     {
-        if (exceptionContext.Exception is not ChannelNotReadyException)
-            logger.PulsarClientException(exceptionContext.Exception);
+        logger.ConsumerRegainedConnection(consumer.Topic);
+        return ValueTask.CompletedTask;  // If an alert has been opened, this 
is where we can close it again
     }
 
-    [LoggerMessage(EventId = 2, Level = LogLevel.Warning, Message = "The 
PulsarClient got an exception")]
-    static partial void PulsarClientException(this ILogger logger, Exception 
exception);
+    [LoggerMessage(EventId = 4, Level = LogLevel.Warning, Message = "The 
consumer for topic '{topic}' has regained the connection")]
+    static partial void ConsumerRegainedConnection(this ILogger logger, string 
topic);
 }
 
 #pragma warning restore IDE0060 // Remove unused parameter
diff --git a/samples/Processing/Worker.cs b/samples/Processing/Worker.cs
index caa9d54..e471b29 100644
--- a/samples/Processing/Worker.cs
+++ b/samples/Processing/Worker.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -27,15 +27,22 @@ public class Worker : BackgroundService
     protected override async Task ExecuteAsync(CancellationToken 
cancellationToken)
     {
         await using var client = PulsarClient.Builder()
-            .ExceptionHandler(context => 
_logger.PulsarClientException(context))
+            .ExceptionHandler(_logger.PulsarClientException) // Optional
             .Build(); // Connecting to pulsar://localhost:6650
 
         await using var consumer = client.NewConsumer(Schema.String)
-            .StateChangedHandler(consumerStateChanged => 
_logger.ConsumerChangedState(consumerStateChanged))
+            .StateChangedHandler(_logger.ConsumerChangedState) // Optional
             .SubscriptionName("MySubscription")
             .Topic("persistent://public/default/mytopic")
             .Create();
 
+        _ = consumer.DelayedStateMonitor( // Recommended way of ignoring the 
short disconnects expected when working with a distributed system
+            ConsumerState.Active, // Operational state
+            TimeSpan.FromSeconds(5), // The amount of time allowed in 
non-operational state before we act
+            _logger.ConsumerLostConnection, // Invoked if we are NOT back in 
operational state after 5 seconds
+            _logger.ConsumerRegainedConnection, // Invoked when we are in 
operational state again
+            cancellationToken);
+
         await consumer.Process(ProcessMessage, cancellationToken);
     }
 
diff --git a/src/DotPulsar/Exceptions/AuthenticationException.cs 
b/src/DotPulsar/Exceptions/AuthenticationException.cs
index ae60915..64f05ad 100644
--- a/src/DotPulsar/Exceptions/AuthenticationException.cs
+++ b/src/DotPulsar/Exceptions/AuthenticationException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -16,6 +16,9 @@ namespace DotPulsar.Exceptions;
 
 using System;
 
+/// <summary>
+/// Non valid authentication
+/// </summary>
 public sealed class AuthenticationException : DotPulsarException
 {
     public AuthenticationException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/AuthorizationException.cs 
b/src/DotPulsar/Exceptions/AuthorizationException.cs
index 211e4ee..af5140f 100644
--- a/src/DotPulsar/Exceptions/AuthorizationException.cs
+++ b/src/DotPulsar/Exceptions/AuthorizationException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// Not authorized to use resource
+/// </summary>
 public sealed class AuthorizationException : DotPulsarException
 {
     public AuthorizationException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ChecksumException.cs 
b/src/DotPulsar/Exceptions/ChecksumException.cs
index 5fd1690..d7dbd69 100644
--- a/src/DotPulsar/Exceptions/ChecksumException.cs
+++ b/src/DotPulsar/Exceptions/ChecksumException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// Error while verifying message checksum
+/// </summary>
 public sealed class ChecksumException : DotPulsarException
 {
     public ChecksumException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ConsumerAssignException.cs 
b/src/DotPulsar/Exceptions/ConsumerAssignException.cs
index c8cf2fa..9402809 100644
--- a/src/DotPulsar/Exceptions/ConsumerAssignException.cs
+++ b/src/DotPulsar/Exceptions/ConsumerAssignException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// Dispatcher assign consumer error
+/// </summary>
 public sealed class ConsumerAssignException : DotPulsarException
 {
     public ConsumerAssignException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ConsumerBusyException.cs 
b/src/DotPulsar/Exceptions/ConsumerBusyException.cs
index 9a858e7..48da45b 100644
--- a/src/DotPulsar/Exceptions/ConsumerBusyException.cs
+++ b/src/DotPulsar/Exceptions/ConsumerBusyException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// Unable to subscribe/unsubscribe because other consumers are connected
+/// </summary>
 public sealed class ConsumerBusyException : DotPulsarException
 {
     public ConsumerBusyException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ConsumerNotFoundException.cs 
b/src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
index f66432b..8705e79 100644
--- a/src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
+++ b/src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// Consumer not found
+/// </summary>
 public sealed class ConsumerNotFoundException : DotPulsarException
 {
     public ConsumerNotFoundException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/IncompatibleSchemaException.cs 
b/src/DotPulsar/Exceptions/IncompatibleSchemaException.cs
index 2ed83ae..33c9df9 100644
--- a/src/DotPulsar/Exceptions/IncompatibleSchemaException.cs
+++ b/src/DotPulsar/Exceptions/IncompatibleSchemaException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// Specified schema was incompatible with topic schema
+/// </summary>
 public sealed class IncompatibleSchemaException : DotPulsarException
 {
     public IncompatibleSchemaException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/InvalidTopicNameException.cs 
b/src/DotPulsar/Exceptions/InvalidTopicNameException.cs
index ff84f8b..5280f07 100644
--- a/src/DotPulsar/Exceptions/InvalidTopicNameException.cs
+++ b/src/DotPulsar/Exceptions/InvalidTopicNameException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// The topic name is not valid
+/// </summary>
 public sealed class InvalidTopicNameException : DotPulsarException
 {
     public InvalidTopicNameException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/InvalidTransactionStatusException.cs 
b/src/DotPulsar/Exceptions/InvalidTransactionStatusException.cs
index caba91a..8d3a549 100644
--- a/src/DotPulsar/Exceptions/InvalidTransactionStatusException.cs
+++ b/src/DotPulsar/Exceptions/InvalidTransactionStatusException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -16,6 +16,9 @@ namespace DotPulsar.Exceptions;
 
 using System;
 
+/// <summary>
+/// Invalid transaction status error
+/// </summary>
 public sealed class InvalidTransactionStatusException : DotPulsarException
 {
     public InvalidTransactionStatusException(string message) : base(message) { 
}
diff --git a/src/DotPulsar/Exceptions/MetadataException.cs 
b/src/DotPulsar/Exceptions/MetadataException.cs
index d8906cd..9002ab4 100644
--- a/src/DotPulsar/Exceptions/MetadataException.cs
+++ b/src/DotPulsar/Exceptions/MetadataException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// Error with ZK/metadata
+/// </summary>
 public sealed class MetadataException : DotPulsarException
 {
     public MetadataException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/NotAllowedException.cs 
b/src/DotPulsar/Exceptions/NotAllowedException.cs
index 8d76b87..7c67048 100644
--- a/src/DotPulsar/Exceptions/NotAllowedException.cs
+++ b/src/DotPulsar/Exceptions/NotAllowedException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -16,6 +16,9 @@ namespace DotPulsar.Exceptions;
 
 using System;
 
+/// <summary>
+/// Not allowed error
+/// </summary>
 public sealed class NotAllowedException : DotPulsarException
 {
     public NotAllowedException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/PersistenceException.cs 
b/src/DotPulsar/Exceptions/PersistenceException.cs
index 2058aa2..11c9ab6 100644
--- a/src/DotPulsar/Exceptions/PersistenceException.cs
+++ b/src/DotPulsar/Exceptions/PersistenceException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// Error writing reading from BK
+/// </summary>
 public sealed class PersistenceException : DotPulsarException
 {
     public PersistenceException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ProducerBlockedQuotaExceededException.cs 
b/src/DotPulsar/Exceptions/ProducerBlockedQuotaExceededException.cs
index d644a49..237fc56 100644
--- a/src/DotPulsar/Exceptions/ProducerBlockedQuotaExceededException.cs
+++ b/src/DotPulsar/Exceptions/ProducerBlockedQuotaExceededException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// Unable to create producer because backlog quota exceeded
+/// </summary>
 public sealed class ProducerBlockedQuotaExceededException : DotPulsarException
 {
     public ProducerBlockedQuotaExceededException(string message) : 
base(message) { }
diff --git a/src/DotPulsar/Exceptions/ProducerBusyException.cs 
b/src/DotPulsar/Exceptions/ProducerBusyException.cs
index 5e573af..71fb1a5 100644
--- a/src/DotPulsar/Exceptions/ProducerBusyException.cs
+++ b/src/DotPulsar/Exceptions/ProducerBusyException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// Producer with same name is already connected
+/// </summary>
 public sealed class ProducerBusyException : DotPulsarException
 {
     public ProducerBusyException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/ServiceNotReadyException.cs 
b/src/DotPulsar/Exceptions/ServiceNotReadyException.cs
index 053aed7..a2e2ee5 100644
--- a/src/DotPulsar/Exceptions/ServiceNotReadyException.cs
+++ b/src/DotPulsar/Exceptions/ServiceNotReadyException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// Any error that requires client retry operation with a fresh lookup
+/// </summary>
 public sealed class ServiceNotReadyException : DotPulsarException
 {
     public ServiceNotReadyException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/SubscriptionNotFoundException.cs 
b/src/DotPulsar/Exceptions/SubscriptionNotFoundException.cs
index 64a7415..19684d6 100644
--- a/src/DotPulsar/Exceptions/SubscriptionNotFoundException.cs
+++ b/src/DotPulsar/Exceptions/SubscriptionNotFoundException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// Subscription not found
+/// </summary>
 public sealed class SubscriptionNotFoundException : DotPulsarException
 {
     public SubscriptionNotFoundException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/TooManyRequestsException.cs 
b/src/DotPulsar/Exceptions/TooManyRequestsException.cs
index 0d67632..ebaae3b 100644
--- a/src/DotPulsar/Exceptions/TooManyRequestsException.cs
+++ b/src/DotPulsar/Exceptions/TooManyRequestsException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// Error with too many simultaneously request
+/// </summary>
 public sealed class TooManyRequestsException : DotPulsarException
 {
     public TooManyRequestsException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/TopicNotFoundException.cs 
b/src/DotPulsar/Exceptions/TopicNotFoundException.cs
index c2585c5..0c9881a 100644
--- a/src/DotPulsar/Exceptions/TopicNotFoundException.cs
+++ b/src/DotPulsar/Exceptions/TopicNotFoundException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// Topic not found
+/// </summary>
 public sealed class TopicNotFoundException : DotPulsarException
 {
     public TopicNotFoundException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/TopicTerminatedException.cs 
b/src/DotPulsar/Exceptions/TopicTerminatedException.cs
index 076f0d3..8002753 100644
--- a/src/DotPulsar/Exceptions/TopicTerminatedException.cs
+++ b/src/DotPulsar/Exceptions/TopicTerminatedException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// The topic has been terminated
+/// </summary>
 public sealed class TopicTerminatedException : DotPulsarException
 {
     public TopicTerminatedException(string message) : base(message) { }
diff --git a/src/DotPulsar/Exceptions/TransactionConflictException.cs 
b/src/DotPulsar/Exceptions/TransactionConflictException.cs
index 1a83c10..a138f0f 100644
--- a/src/DotPulsar/Exceptions/TransactionConflictException.cs
+++ b/src/DotPulsar/Exceptions/TransactionConflictException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -16,6 +16,9 @@ namespace DotPulsar.Exceptions;
 
 using System;
 
+/// <summary>
+/// Ack with transaction conflict
+/// </summary>
 public sealed class TransactionConflictException : DotPulsarException
 {
     public TransactionConflictException(string message) : base(message) { }
diff --git 
a/src/DotPulsar/Exceptions/TransactionCoordinatorNotFoundException.cs 
b/src/DotPulsar/Exceptions/TransactionCoordinatorNotFoundException.cs
index c79117a..8f9c0f2 100644
--- a/src/DotPulsar/Exceptions/TransactionCoordinatorNotFoundException.cs
+++ b/src/DotPulsar/Exceptions/TransactionCoordinatorNotFoundException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -16,6 +16,9 @@ namespace DotPulsar.Exceptions;
 
 using System;
 
+/// <summary>
+/// Transaction coordinator not found error
+/// </summary>
 public sealed class TransactionCoordinatorNotFoundException : 
DotPulsarException
 {
     public TransactionCoordinatorNotFoundException(string message) : 
base(message) { }
diff --git a/src/DotPulsar/Exceptions/UnsupportedVersionException.cs 
b/src/DotPulsar/Exceptions/UnsupportedVersionException.cs
index 28594a4..4c45054 100644
--- a/src/DotPulsar/Exceptions/UnsupportedVersionException.cs
+++ b/src/DotPulsar/Exceptions/UnsupportedVersionException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,6 +14,9 @@
 
 namespace DotPulsar.Exceptions;
 
+/// <summary>
+/// Error when an older client/version doesn't support a required feature
+/// </summary>
 public sealed class UnsupportedVersionException : DotPulsarException
 {
     public UnsupportedVersionException(string message) : base(message) { }
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs 
b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index 49c8027..0c6d59e 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -25,19 +25,15 @@ using System.Threading.Tasks;
 
 public sealed class DefaultExceptionHandler : IHandleException
 {
-    private readonly TimeSpan _retryInterval;
-
-    public DefaultExceptionHandler(TimeSpan retryInterval)
-        => _retryInterval = retryInterval;
-
-    public async ValueTask OnException(ExceptionContext exceptionContext)
+    public ValueTask OnException(ExceptionContext exceptionContext)
     {
         exceptionContext.Result = 
DetermineFaultAction(exceptionContext.Exception, 
exceptionContext.CancellationToken);
-
-        if (exceptionContext.Result == FaultAction.Retry)
-            await Task.Delay(_retryInterval, 
exceptionContext.CancellationToken).ConfigureAwait(false);
-
         exceptionContext.ExceptionHandled = true;
+#if NET6_0_OR_GREATER
+        return ValueTask.CompletedTask;
+#else
+        return new ValueTask();
+#endif
     }
 
     private static FaultAction DetermineFaultAction(Exception exception, 
CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs 
b/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
index 00d943e..f6e0b7d 100644
--- a/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
+++ b/src/DotPulsar/Internal/ExceptionHandlerPipeline.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -15,6 +15,7 @@
 namespace DotPulsar.Internal;
 
 using DotPulsar.Abstractions;
+using System;
 using System.Collections.Generic;
 using System.Linq;
 using System.Threading.Tasks;
@@ -22,9 +23,13 @@ using System.Threading.Tasks;
 public sealed class ExceptionHandlerPipeline : IHandleException
 {
     private readonly IHandleException[] _handlers;
+    private readonly TimeSpan _retryInterval;
 
-    public ExceptionHandlerPipeline(IEnumerable<IHandleException> handlers)
-        => _handlers = handlers.ToArray();
+    public ExceptionHandlerPipeline(TimeSpan retryInterval, 
IEnumerable<IHandleException> handlers)
+    {
+        _retryInterval = retryInterval;
+        _handlers = handlers.ToArray();
+    }
 
     public async ValueTask OnException(ExceptionContext exceptionContext)
     {
@@ -35,5 +40,8 @@ public sealed class ExceptionHandlerPipeline : 
IHandleException
             if (exceptionContext.ExceptionHandled)
                 break;
         }
+
+        if (exceptionContext.Result == FaultAction.Retry)
+            await Task.Delay(_retryInterval, 
exceptionContext.CancellationToken).ConfigureAwait(false);
     }
 }
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs 
b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 373667f..0aaee42 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -171,8 +171,8 @@ public sealed class PulsarClientBuilder : 
IPulsarClientBuilder
 
         var connector = new Connector(_clientCertificates, 
_trustedCertificateAuthority, _verifyCertificateAuthority, 
_verifyCertificateName, _checkCertificateRevocation);
 
-        var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) 
{ new DefaultExceptionHandler(_retryInterval) };
-        var exceptionHandlerPipeline = new 
ExceptionHandlerPipeline(exceptionHandlers);
+        var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) 
{ new DefaultExceptionHandler() };
+        var exceptionHandlerPipeline = new 
ExceptionHandlerPipeline(_retryInterval, exceptionHandlers);
         var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, 
connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, 
_listenerName, _keepAliveInterval, _authentication);
         var processManager = new ProcessManager(connectionPool);
 
diff --git a/src/DotPulsar/Internal/PulsarStream.cs 
b/src/DotPulsar/Internal/PulsarStream.cs
index af41066..13488bc 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -28,11 +28,11 @@ using System.Threading.Tasks;
 
 public sealed class PulsarStream : IPulsarStream
 {
-    private const int _frameSizePrefix = 4;
-    private const int _unknownFrameSize = 0;
-    private const long _pauseAtMoreThan10Mb = 10485760;
-    private const long _resumeAt5MbOrLess = 5242881;
-    private const int _chunkSize = 75000;
+    private const int FrameSizePrefix = 4;
+    private const int UnknownFrameSize = 0;
+    private const long PauseAtMoreThan10Mb = 10485760;
+    private const long ResumeAt5MbOrLess = 5242881;
+    private const int ChunkSize = 75000;
 
     private readonly Stream _stream;
     private readonly ChunkingPipeline _pipeline;
@@ -43,8 +43,8 @@ public sealed class PulsarStream : IPulsarStream
     public PulsarStream(Stream stream)
     {
         _stream = stream;
-        _pipeline = new ChunkingPipeline(stream, _chunkSize);
-        var options = new PipeOptions(pauseWriterThreshold: 
_pauseAtMoreThan10Mb, resumeWriterThreshold: _resumeAt5MbOrLess);
+        _pipeline = new ChunkingPipeline(stream, ChunkSize);
+        var options = new PipeOptions(pauseWriterThreshold: 
PauseAtMoreThan10Mb, resumeWriterThreshold: ResumeAt5MbOrLess);
         var pipe = new Pipe(options);
         _reader = pipe.Reader;
         _writer = pipe.Writer;
@@ -119,33 +119,33 @@ public sealed class PulsarStream : IPulsarStream
 
         try
         {
-            var frameSize = _unknownFrameSize;
+            var frameSize = UnknownFrameSize;
             var totalSize = 0;
 
             while (true)
             {
-                var minimumSize = _frameSizePrefix + frameSize;
+                var minimumSize = FrameSizePrefix + frameSize;
                 var readResult = await _reader.ReadAtLeastAsync(minimumSize, 
cancellationToken).ConfigureAwait(false);
                 var buffer = readResult.Buffer;
 
                 while (true)
                 {
-                    if (buffer.Length < _frameSizePrefix)
+                    if (buffer.Length < FrameSizePrefix)
                         break;
 
-                    if (frameSize == _unknownFrameSize)
+                    if (frameSize == UnknownFrameSize)
                     {
                         frameSize = (int) buffer.ReadUInt32(0, true);
-                        totalSize = _frameSizePrefix + frameSize;
+                        totalSize = FrameSizePrefix + frameSize;
                     }
 
                     if (buffer.Length < totalSize)
                         break;
 
-                    yield return buffer.Slice(_frameSizePrefix, frameSize);
+                    yield return buffer.Slice(FrameSizePrefix, frameSize);
 
                     buffer = buffer.Slice(totalSize);
-                    frameSize = _unknownFrameSize;
+                    frameSize = UnknownFrameSize;
                 }
 
                 if (readResult.IsCompleted)

Reply via email to