This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fabc9e3 Adding some extension methods and moving certain exceptions
to 'public' in order to create a lean and correct Processing sample (without
warnings)
fabc9e3 is described below
commit fabc9e34138b83280306b64cfc0003e7d5faf013
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Thu Feb 3 17:29:07 2022 +0100
Adding some extension methods and moving certain exceptions to 'public' in
order to create a lean and correct Processing sample (without warnings)
---
samples/Processing/LoggerExtensions.cs | 65 ++++++++++++++++++++++
samples/Processing/Processing.csproj | 2 +-
samples/Processing/Worker.cs | 25 ++-------
.../Exceptions/ChannelNotReadyException.cs | 6 +-
.../Exceptions/ConsumerNotFoundException.cs | 4 +-
.../Exceptions/ServiceNotReadyException.cs | 4 +-
.../Exceptions/TooManyRequestsException.cs | 4 +-
.../Extensions/ConsumerBuilderExtensions.cs | 12 ++++
.../Extensions/ProducerBuilderExtensions.cs | 12 ++++
.../Extensions/ReaderBuilderExtensions.cs | 12 ++++
src/DotPulsar/Internal/NotReadyChannel.cs | 2 +-
11 files changed, 113 insertions(+), 35 deletions(-)
diff --git a/samples/Processing/LoggerExtensions.cs
b/samples/Processing/LoggerExtensions.cs
new file mode 100644
index 0000000..9c46547
--- /dev/null
+++ b/samples/Processing/LoggerExtensions.cs
@@ -0,0 +1,65 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Processing;
+
+using DotPulsar;
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+
+#pragma warning disable IDE0079 // Remove unnecessary suppression... Ehm...
*sigh*
+#pragma warning disable IDE0060 // Remove unused parameter... Why Microsoft?
Why do you force me to do this?
+
+public static partial class LoggerExtensions
+{
+ // ConsumerChangedState
+ public static void ConsumerChangedState(this ILogger logger,
ConsumerStateChanged stateChanged)
+ {
+ var logLevel = stateChanged.ConsumerState switch
+ {
+ ConsumerState.Disconnected => LogLevel.Warning,
+ ConsumerState.Faulted => LogLevel.Error,
+ _ => LogLevel.Information
+ };
+
+ logger.ConsumerChangedState(logLevel, stateChanged.Consumer.Topic,
stateChanged.ConsumerState.ToString());
+ }
+
+ [LoggerMessage(EventId = 0, Message = "The consumer for topic '{topic}'
changed state to '{state}'")]
+ static partial void ConsumerChangedState(this ILogger logger, LogLevel
logLevel, string topic, string state);
+
+ // OutputMessage
+ public static void OutputMessage(this ILogger logger, IMessage<string>
message)
+ {
+ var publishedOn = message.PublishTimeAsDateTime;
+ var payload = message.Value();
+ logger.OutputMessage(publishedOn, payload);
+ }
+
+ [LoggerMessage(EventId = 1, Level = LogLevel.Information, Message =
"Received: '{payload}' published on {publishedOn}")]
+ static partial void OutputMessage(this ILogger logger, DateTime
publishedOn, string payload);
+
+ // PulsarClientException
+ public static void PulsarClientException(this ILogger logger,
ExceptionContext exceptionContext)
+ {
+ if (exceptionContext.Exception is not ChannelNotReadyException)
+ logger.PulsarClientException(exceptionContext.Exception);
+ }
+
+ [LoggerMessage(EventId = 2, Level = LogLevel.Warning, Message = "The
PulsarClient got an exception")]
+ static partial void PulsarClientException(this ILogger logger, Exception
exception);
+}
+
+#pragma warning restore IDE0060 // Remove unused parameter
+#pragma warning restore IDE0079 // Remove unnecessary suppression
diff --git a/samples/Processing/Processing.csproj
b/samples/Processing/Processing.csproj
index b5ec451..ad4d9a8 100644
--- a/samples/Processing/Processing.csproj
+++ b/samples/Processing/Processing.csproj
@@ -1,4 +1,4 @@
-<Project Sdk="Microsoft.NET.Sdk.Worker">
+<Project Sdk="Microsoft.NET.Sdk.Worker">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
diff --git a/samples/Processing/Worker.cs b/samples/Processing/Worker.cs
index d4d639d..e753b52 100644
--- a/samples/Processing/Worker.cs
+++ b/samples/Processing/Worker.cs
@@ -26,10 +26,12 @@ public class Worker : BackgroundService
protected override async Task ExecuteAsync(CancellationToken
cancellationToken)
{
- await using var client = PulsarClient.Builder().Build(); //Connecting
to pulsar://localhost:6650
+ await using var client = PulsarClient.Builder()
+ .ExceptionHandler(context =>
_logger.PulsarClientException(context))
+ .Build(); //Connecting to pulsar://localhost:6650
await using var consumer = client.NewConsumer(Schema.String)
- .StateChangedHandler(Monitor, cancellationToken)
+ .StateChangedHandler(consumerStateChanged =>
_logger.ConsumerChangedState(consumerStateChanged))
.SubscriptionName("MySubscription")
.Topic("persistent://public/default/mytopic")
.Create();
@@ -39,24 +41,7 @@ public class Worker : BackgroundService
private ValueTask ProcessMessage(IMessage<string> message,
CancellationToken cancellationToken)
{
- _logger.LogInformation($"Received: {message.Value()}");
+ _logger.OutputMessage(message);
return ValueTask.CompletedTask;
}
-
- private void Monitor(ConsumerStateChanged stateChanged, CancellationToken
cancellationToken)
- {
- var stateMessage = stateChanged.ConsumerState switch
- {
- ConsumerState.Active => "is active",
- ConsumerState.Inactive => "is inactive",
- ConsumerState.Disconnected => "is disconnected",
- ConsumerState.Closed => "has closed",
- ConsumerState.ReachedEndOfTopic => "has reached end of topic",
- ConsumerState.Faulted => "has faulted",
- _ => $"has an unknown state '{stateChanged.ConsumerState}'"
- };
-
- var topic = stateChanged.Consumer.Topic;
- _logger.LogInformation($"The consumer for topic '{topic}'
{stateMessage}");
- }
}
diff --git a/src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs
b/src/DotPulsar/Exceptions/ChannelNotReadyException.cs
similarity index 80%
rename from src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs
rename to src/DotPulsar/Exceptions/ChannelNotReadyException.cs
index 599c0b2..712734b 100644
--- a/src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs
+++ b/src/DotPulsar/Exceptions/ChannelNotReadyException.cs
@@ -12,11 +12,9 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Exceptions;
-
-using DotPulsar.Exceptions;
+namespace DotPulsar.Exceptions;
public sealed class ChannelNotReadyException : DotPulsarException
{
- public ChannelNotReadyException() : base("The service is not ready yet") {
}
+ public ChannelNotReadyException() : base("The channnel is not ready yet")
{ }
}
diff --git a/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
b/src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
similarity index 91%
rename from src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
rename to src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
index be58640..f66432b 100644
--- a/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
+++ b/src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
@@ -12,9 +12,7 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Exceptions;
-
-using DotPulsar.Exceptions;
+namespace DotPulsar.Exceptions;
public sealed class ConsumerNotFoundException : DotPulsarException
{
diff --git a/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
b/src/DotPulsar/Exceptions/ServiceNotReadyException.cs
similarity index 90%
rename from src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
rename to src/DotPulsar/Exceptions/ServiceNotReadyException.cs
index 35e9a51..053aed7 100644
--- a/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
+++ b/src/DotPulsar/Exceptions/ServiceNotReadyException.cs
@@ -12,9 +12,7 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Exceptions;
-
-using DotPulsar.Exceptions;
+namespace DotPulsar.Exceptions;
public sealed class ServiceNotReadyException : DotPulsarException
{
diff --git a/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
b/src/DotPulsar/Exceptions/TooManyRequestsException.cs
similarity index 90%
rename from src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
rename to src/DotPulsar/Exceptions/TooManyRequestsException.cs
index 22ef3ae..0d67632 100644
--- a/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
+++ b/src/DotPulsar/Exceptions/TooManyRequestsException.cs
@@ -12,9 +12,7 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Exceptions;
-
-using DotPulsar.Exceptions;
+namespace DotPulsar.Exceptions;
public sealed class TooManyRequestsException : DotPulsarException
{
diff --git a/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
index 7ab0354..5cecd16 100644
--- a/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
@@ -30,6 +30,18 @@ public static class ConsumerBuilderExtensions
/// </summary>
public static IConsumerBuilder<TMessage> StateChangedHandler<TMessage>(
this IConsumerBuilder<TMessage> builder,
+ Action<ConsumerStateChanged> handler)
+ {
+ void forwarder(ConsumerStateChanged consumerStateChanged,
CancellationToken _) => handler(consumerStateChanged);
+ builder.StateChangedHandler(new
ActionStateChangedHandler<ConsumerStateChanged>(forwarder, default));
+ return builder;
+ }
+
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ public static IConsumerBuilder<TMessage> StateChangedHandler<TMessage>(
+ this IConsumerBuilder<TMessage> builder,
Action<ConsumerStateChanged, CancellationToken> handler,
CancellationToken cancellationToken = default)
{
diff --git a/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
b/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
index 907e770..11cc6c7 100644
--- a/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
@@ -30,6 +30,18 @@ public static class ProducerBuilderExtensions
/// </summary>
public static IProducerBuilder<TMessage> StateChangedHandler<TMessage>(
this IProducerBuilder<TMessage> builder,
+ Action<ProducerStateChanged> handler)
+ {
+ void forwarder(ProducerStateChanged producerStateChanged,
CancellationToken _) => handler(producerStateChanged);
+ builder.StateChangedHandler(new
ActionStateChangedHandler<ProducerStateChanged>(forwarder, default));
+ return builder;
+ }
+
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ public static IProducerBuilder<TMessage> StateChangedHandler<TMessage>(
+ this IProducerBuilder<TMessage> builder,
Action<ProducerStateChanged, CancellationToken> handler,
CancellationToken cancellationToken = default)
{
diff --git a/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
b/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
index 32f7505..555a5af 100644
--- a/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
@@ -30,6 +30,18 @@ public static class ReaderBuilderExtensions
/// </summary>
public static IReaderBuilder<TMessage> StateChangedHandler<TMessage>(
this IReaderBuilder<TMessage> builder,
+ Action<ReaderStateChanged> handler)
+ {
+ void forwarder(ReaderStateChanged readerStateChanged,
CancellationToken _) => handler(readerStateChanged);
+ builder.StateChangedHandler(new
ActionStateChangedHandler<ReaderStateChanged>(forwarder, default));
+ return builder;
+ }
+
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ public static IReaderBuilder<TMessage> StateChangedHandler<TMessage>(
+ this IReaderBuilder<TMessage> builder,
Action<ReaderStateChanged, CancellationToken> handler,
CancellationToken cancellationToken = default)
{
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs
b/src/DotPulsar/Internal/NotReadyChannel.cs
index da1a7af..a3c990e 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -16,7 +16,7 @@ namespace DotPulsar.Internal;
using Abstractions;
using DotPulsar.Abstractions;
-using Exceptions;
+using DotPulsar.Exceptions;
using PulsarApi;
using System;
using System.Buffers;