This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 061f553  Changed the Consuming sample project from a console app to a 
work service and created sample workers for Receive, Messages, and Process. No 
longer needed the Processing sample, since that is moved to the Consuming 
sample.
061f553 is described below

commit 061f553530d50e7fde2ca6fb84d949804bda58c7
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Feb 5 14:06:30 2025 +0100

    Changed the Consuming sample project from a console app to a work service 
and created sample workers for Receive, Messages, and Process. No longer needed 
the Processing sample, since that is moved to the Consuming sample.
---
 DotPulsar.sln                                      |  7 ---
 samples/Consuming/Consuming.csproj                 | 10 +++-
 .../Worker.cs => Consuming/MessagesWorker.cs}      | 31 +++++-------
 .../Worker.cs => Consuming/ProcessWorker.cs}       | 17 ++++---
 samples/Consuming/Program.cs                       | 55 ++++------------------
 .../Properties/launchSettings.json                 |  0
 .../Worker.cs => Consuming/ReceiveWorker.cs}       | 32 +++++--------
 .../appsettings.Development.json                   |  0
 samples/{Processing => Consuming}/appsettings.json |  0
 samples/Extensions/LoggerExtensions.cs             | 10 ++--
 samples/Processing/Processing.csproj               | 19 --------
 samples/Processing/Program.cs                      | 22 ---------
 12 files changed, 56 insertions(+), 147 deletions(-)

diff --git a/DotPulsar.sln b/DotPulsar.sln
index 032bfca..834d99e 100644
--- a/DotPulsar.sln
+++ b/DotPulsar.sln
@@ -24,8 +24,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution 
Items", "Solution
                README.md = README.md
        EndProjectSection
 EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Processing", 
"samples\Processing\Processing.csproj", "{CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}"
-EndProject
 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Compression", 
"benchmarks\Compression\Compression.csproj", 
"{040F8253-074D-4977-BDB1-0D9798B52CE2}"
 EndProject
 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmarks", 
"Benchmarks", "{2C57AF4B-0D23-42D7-86FE-80277FD52875}"
@@ -60,10 +58,6 @@ Global
                {6D44683B-865C-4D15-9F0A-1A8441354589}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
                {6D44683B-865C-4D15-9F0A-1A8441354589}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
                {6D44683B-865C-4D15-9F0A-1A8441354589}.Release|Any CPU.Build.0 
= Release|Any CPU
-               {CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}.Debug|Any CPU.ActiveCfg 
= Debug|Any CPU
-               {CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
-               {CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
-               {CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}.Release|Any CPU.Build.0 
= Release|Any CPU
                {040F8253-074D-4977-BDB1-0D9798B52CE2}.Debug|Any CPU.ActiveCfg 
= Debug|Any CPU
                {040F8253-074D-4977-BDB1-0D9798B52CE2}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
                {040F8253-074D-4977-BDB1-0D9798B52CE2}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
@@ -85,7 +79,6 @@ Global
                {2A810EB9-45CE-4593-8E4C-026E0CBB3C42} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
                {14934BED-A222-47B2-A58A-CFC4AAB89B49} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
                {6D44683B-865C-4D15-9F0A-1A8441354589} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
-               {CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
                {040F8253-074D-4977-BDB1-0D9798B52CE2} = 
{2C57AF4B-0D23-42D7-86FE-80277FD52875}
                {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
                {F211960B-F2C6-4878-B1AD-72D63CDB3B7A} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
diff --git a/samples/Consuming/Consuming.csproj 
b/samples/Consuming/Consuming.csproj
index 1fd8f8f..3bbd0da 100644
--- a/samples/Consuming/Consuming.csproj
+++ b/samples/Consuming/Consuming.csproj
@@ -1,13 +1,19 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk.Worker">
 
   <PropertyGroup>
-    <OutputType>Exe</OutputType>
     <TargetFramework>net9.0</TargetFramework>
+    <Nullable>enable</Nullable>
     <ImplicitUsings>enable</ImplicitUsings>
+    <NoWarn>CA2012</NoWarn>
   </PropertyGroup>
 
+  <ItemGroup>
+    <PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.1" />
+  </ItemGroup>
+
   <ItemGroup>
     <ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
+    <ProjectReference Include="..\Extensions\Extensions.csproj" />
   </ItemGroup>
 
 </Project>
diff --git a/samples/Processing/Worker.cs b/samples/Consuming/MessagesWorker.cs
similarity index 53%
copy from samples/Processing/Worker.cs
copy to samples/Consuming/MessagesWorker.cs
index 7048c39..d71761c 100644
--- a/samples/Processing/Worker.cs
+++ b/samples/Consuming/MessagesWorker.cs
@@ -12,20 +12,20 @@
  * limitations under the License.
  */
 
-namespace Processing;
+namespace Consuming;
 
 using DotPulsar;
-using DotPulsar.Abstractions;
 using DotPulsar.Extensions;
 using Extensions;
+using System.Threading;
 
-public sealed class Worker : BackgroundService
+public sealed class MessagesWorker : BackgroundService
 {
-    private readonly ILogger<Worker> _logger;
+    private readonly ILogger _logger;
 
-    public Worker(ILogger<Worker> logger) => _logger = logger;
+    public MessagesWorker(ILogger<MessagesWorker> logger) => _logger = logger;
 
-    protected override async Task ExecuteAsync(CancellationToken 
cancellationToken)
+    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
     {
         await using var client = PulsarClient.Builder()
             .ExceptionHandler(_logger.PulsarClientException) // Optional
@@ -37,19 +37,12 @@ public sealed class Worker : BackgroundService
             .Topic("persistent://public/default/mytopic")
             .Create();
 
-        _ = consumer.DelayedStateMonitor(       // Recommended way of ignoring 
the short disconnects expected when working with a distributed system
-            ConsumerState.Active,               // Operational state
-            TimeSpan.FromSeconds(5),            // The amount of time allowed 
in non-operational state before we act
-            _logger.ConsumerLostConnection,     // Invoked if we are NOT back 
in operational state after 5 seconds
-            _logger.ConsumerRegainedConnection, // Invoked when we are in 
operational state again
-            cancellationToken);
+        _logger.LogInformation("Will start receiving messages with 
'Messages'");
 
-        await consumer.Process(ProcessMessage, cancellationToken);
-    }
-
-    private ValueTask ProcessMessage(IMessage<string> message, 
CancellationToken cancellationToken)
-    {
-        _logger.OutputMessage(message);
-        return ValueTask.CompletedTask;
+        await foreach (var message in consumer.Messages(stoppingToken))
+        {
+            _logger.OutputMessage(message);
+            await consumer.Acknowledge(message, stoppingToken);
+        }
     }
 }
diff --git a/samples/Processing/Worker.cs b/samples/Consuming/ProcessWorker.cs
similarity index 83%
copy from samples/Processing/Worker.cs
copy to samples/Consuming/ProcessWorker.cs
index 7048c39..f86a0dd 100644
--- a/samples/Processing/Worker.cs
+++ b/samples/Consuming/ProcessWorker.cs
@@ -12,20 +12,21 @@
  * limitations under the License.
  */
 
-namespace Processing;
+namespace Consuming;
 
 using DotPulsar;
 using DotPulsar.Abstractions;
 using DotPulsar.Extensions;
 using Extensions;
+using System.Threading;
 
-public sealed class Worker : BackgroundService
+public sealed class ProcessWorker : BackgroundService
 {
-    private readonly ILogger<Worker> _logger;
+    private readonly ILogger _logger;
 
-    public Worker(ILogger<Worker> logger) => _logger = logger;
+    public ProcessWorker(ILogger<ProcessWorker> logger) => _logger = logger;
 
-    protected override async Task ExecuteAsync(CancellationToken 
cancellationToken)
+    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
     {
         await using var client = PulsarClient.Builder()
             .ExceptionHandler(_logger.PulsarClientException) // Optional
@@ -42,9 +43,11 @@ public sealed class Worker : BackgroundService
             TimeSpan.FromSeconds(5),            // The amount of time allowed 
in non-operational state before we act
             _logger.ConsumerLostConnection,     // Invoked if we are NOT back 
in operational state after 5 seconds
             _logger.ConsumerRegainedConnection, // Invoked when we are in 
operational state again
-            cancellationToken);
+            stoppingToken);
 
-        await consumer.Process(ProcessMessage, cancellationToken);
+        _logger.LogInformation("Will start receiving messages with 'Process'");
+
+        await consumer.Process(ProcessMessage, stoppingToken);
     }
 
     private ValueTask ProcessMessage(IMessage<string> message, 
CancellationToken cancellationToken)
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 649b3c4..bfd9238 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -12,48 +12,13 @@
  * limitations under the License.
  */
 
-using DotPulsar;
-using DotPulsar.Abstractions;
-using DotPulsar.Extensions;
-
-var cts = new CancellationTokenSource();
-
-Console.CancelKeyPress += (sender, args) =>
-{
-    cts.Cancel();
-    args.Cancel = true;
-};
-
-await using var client = PulsarClient
-    .Builder()
-    .ExceptionHandler(ExceptionHandler)
-    .Build(); // Connecting to pulsar://localhost:6650
-
-await using var consumer = client.NewConsumer(Schema.String)
-    .StateChangedHandler(StateChangedHandler)
-    .SubscriptionName("MySubscription")
-    .Topic("persistent://public/default/mytopic")
-    .Create();
-
-Console.WriteLine("Press Ctrl+C to exit");
-
-await ConsumeMessages(consumer, cts.Token);
-
-async Task ConsumeMessages(IConsumer<string> consumer, CancellationToken 
cancellationToken)
-{
-    try
-    {
-        await foreach (var message in consumer.Messages(cancellationToken))
-        {
-            Console.WriteLine($"Received: {message.Value()}");
-            await consumer.Acknowledge(message, cancellationToken);
-        }
-    }
-    catch (OperationCanceledException) { }
-}
-
-void ExceptionHandler(ExceptionContext context) =>
-    Console.WriteLine($"The PulsarClient got an exception: 
{context.Exception}");
-
-void StateChangedHandler(ConsumerStateChanged stateChanged) =>
-    Console.WriteLine($"The consumer for topic '{stateChanged.Consumer.Topic}' 
changed state to '{stateChanged.ConsumerState}'");
+using Consuming;
+using Extensions;
+
+await Host
+    .CreateApplicationBuilder(args)
+    //.AddHostedService<ReceiveWorker>()  // Using 'Receive'
+    //.AddHostedService<MessagesWorker>() // Using 'Messages'
+    .AddHostedService<ProcessWorker>()  // Using 'Process' (recommended)
+    .Build()
+    .RunAsync();
diff --git a/samples/Processing/Properties/launchSettings.json 
b/samples/Consuming/Properties/launchSettings.json
similarity index 100%
rename from samples/Processing/Properties/launchSettings.json
rename to samples/Consuming/Properties/launchSettings.json
diff --git a/samples/Processing/Worker.cs b/samples/Consuming/ReceiveWorker.cs
similarity index 53%
rename from samples/Processing/Worker.cs
rename to samples/Consuming/ReceiveWorker.cs
index 7048c39..52e8698 100644
--- a/samples/Processing/Worker.cs
+++ b/samples/Consuming/ReceiveWorker.cs
@@ -12,20 +12,20 @@
  * limitations under the License.
  */
 
-namespace Processing;
+namespace Consuming;
 
 using DotPulsar;
-using DotPulsar.Abstractions;
 using DotPulsar.Extensions;
 using Extensions;
+using System.Threading;
 
-public sealed class Worker : BackgroundService
+public sealed class ReceiveWorker : BackgroundService
 {
-    private readonly ILogger<Worker> _logger;
+    private readonly ILogger _logger;
 
-    public Worker(ILogger<Worker> logger) => _logger = logger;
+    public ReceiveWorker(ILogger<ReceiveWorker> logger) => _logger = logger;
 
-    protected override async Task ExecuteAsync(CancellationToken 
cancellationToken)
+    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
     {
         await using var client = PulsarClient.Builder()
             .ExceptionHandler(_logger.PulsarClientException) // Optional
@@ -37,19 +37,13 @@ public sealed class Worker : BackgroundService
             .Topic("persistent://public/default/mytopic")
             .Create();
 
-        _ = consumer.DelayedStateMonitor(       // Recommended way of ignoring 
the short disconnects expected when working with a distributed system
-            ConsumerState.Active,               // Operational state
-            TimeSpan.FromSeconds(5),            // The amount of time allowed 
in non-operational state before we act
-            _logger.ConsumerLostConnection,     // Invoked if we are NOT back 
in operational state after 5 seconds
-            _logger.ConsumerRegainedConnection, // Invoked when we are in 
operational state again
-            cancellationToken);
+        _logger.LogInformation("Will start receiving messages with 'Receive'");
 
-        await consumer.Process(ProcessMessage, cancellationToken);
-    }
-
-    private ValueTask ProcessMessage(IMessage<string> message, 
CancellationToken cancellationToken)
-    {
-        _logger.OutputMessage(message);
-        return ValueTask.CompletedTask;
+        while (!stoppingToken.IsCancellationRequested)
+        {
+            var message = await consumer.Receive(stoppingToken);
+            _logger.OutputMessage(message);
+            await consumer.Acknowledge(message, stoppingToken);
+        }
     }
 }
diff --git a/samples/Processing/appsettings.Development.json 
b/samples/Consuming/appsettings.Development.json
similarity index 100%
rename from samples/Processing/appsettings.Development.json
rename to samples/Consuming/appsettings.Development.json
diff --git a/samples/Processing/appsettings.json 
b/samples/Consuming/appsettings.json
similarity index 100%
rename from samples/Processing/appsettings.json
rename to samples/Consuming/appsettings.json
diff --git a/samples/Extensions/LoggerExtensions.cs 
b/samples/Extensions/LoggerExtensions.cs
index b093d28..5c0a2a8 100644
--- a/samples/Extensions/LoggerExtensions.cs
+++ b/samples/Extensions/LoggerExtensions.cs
@@ -40,14 +40,10 @@ public static partial class LoggerExtensions
     /// Default logger for an IMessage with a string value
     /// </summary>
     public static void OutputMessage(this ILogger logger, IMessage<string> 
message)
-    {
-        var publishedOn = message.PublishTimeAsDateTime;
-        var payload = message.Value();
-        logger.OutputMessage(publishedOn, payload);
-    }
+        => logger.OutputMessage(message.Value(), 
message.PublishTimeAsDateTime, message.MessageId);
 
-    [LoggerMessage(EventId = 1, Level = LogLevel.Information, Message = 
"Received: '{payload}' published on {publishedOn}")]
-    static partial void OutputMessage(this ILogger logger, DateTime 
publishedOn, string payload);
+    [LoggerMessage(EventId = 1, Level = LogLevel.Information, Message = 
"Received: '{payload}' published on '{publishedOn}' with message id 
'{messageId}'")]
+    static partial void OutputMessage(this ILogger logger, string payload, 
DateTime publishedOn, MessageId messageId);
 
     /// <summary>
     /// Default logger for consumer state monitoring
diff --git a/samples/Processing/Processing.csproj 
b/samples/Processing/Processing.csproj
deleted file mode 100644
index d611fd6..0000000
--- a/samples/Processing/Processing.csproj
+++ /dev/null
@@ -1,19 +0,0 @@
-<Project Sdk="Microsoft.NET.Sdk.Worker">
-
-  <PropertyGroup>
-    <TargetFramework>net9.0</TargetFramework>
-    <Nullable>enable</Nullable>
-    <ImplicitUsings>enable</ImplicitUsings>
-    <NoWarn>CA2012</NoWarn>
-  </PropertyGroup>
-
-  <ItemGroup>
-    <PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.1" />
-  </ItemGroup>
-
-  <ItemGroup>
-    <ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
-    <ProjectReference Include="..\Extensions\Extensions.csproj" />
-  </ItemGroup>
-  
-</Project>
diff --git a/samples/Processing/Program.cs b/samples/Processing/Program.cs
deleted file mode 100644
index 273cb18..0000000
--- a/samples/Processing/Program.cs
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using Extensions;
-using Processing;
-
-await Host
-    .CreateApplicationBuilder(args)
-    .AddHostedService<Worker>()
-    .Build()
-    .RunAsync();

Reply via email to