This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new be4cb70  Move send channel sample to "Producing" project (#251)
be4cb70 is described below

commit be4cb7031586f7466e933b5b6e19c4e35bbdfdeb
Author: Kristian Andersen <[email protected]>
AuthorDate: Fri Feb 7 10:29:19 2025 +0100

    Move send channel sample to "Producing" project (#251)
---
 DotPulsar.sln                          |  7 ----
 samples/Producing/Program.cs           |  1 +
 samples/Producing/SendChannelWorker.cs | 62 +++++++++++++++++++++++++++++
 samples/SendChannel/Program.cs         | 73 ----------------------------------
 samples/SendChannel/SendChannel.csproj | 13 ------
 5 files changed, 63 insertions(+), 93 deletions(-)

diff --git a/DotPulsar.sln b/DotPulsar.sln
index 834d99e..0a6377b 100644
--- a/DotPulsar.sln
+++ b/DotPulsar.sln
@@ -28,8 +28,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = 
"Compression", "benchmarks\C
 EndProject
 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmarks", 
"Benchmarks", "{2C57AF4B-0D23-42D7-86FE-80277FD52875}"
 EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SendChannel", 
"samples\SendChannel\SendChannel.csproj", 
"{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}"
-EndProject
 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Extensions", 
"samples\Extensions\Extensions.csproj", "{F211960B-F2C6-4878-B1AD-72D63CDB3B7A}"
 EndProject
 Global
@@ -62,10 +60,6 @@ Global
                {040F8253-074D-4977-BDB1-0D9798B52CE2}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
                {040F8253-074D-4977-BDB1-0D9798B52CE2}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
                {040F8253-074D-4977-BDB1-0D9798B52CE2}.Release|Any CPU.Build.0 
= Release|Any CPU
-               {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.ActiveCfg 
= Debug|Any CPU
-               {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
-               {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
-               {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any CPU.Build.0 
= Release|Any CPU
                {F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Debug|Any CPU.ActiveCfg 
= Debug|Any CPU
                {F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
                {F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
@@ -80,7 +74,6 @@ Global
                {14934BED-A222-47B2-A58A-CFC4AAB89B49} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
                {6D44683B-865C-4D15-9F0A-1A8441354589} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
                {040F8253-074D-4977-BDB1-0D9798B52CE2} = 
{2C57AF4B-0D23-42D7-86FE-80277FD52875}
-               {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
                {F211960B-F2C6-4878-B1AD-72D63CDB3B7A} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
        EndGlobalSection
        GlobalSection(ExtensibilityGlobals) = postSolution
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 276ed24..a3f20c8 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -17,6 +17,7 @@ using Producing;
 
 await Host
     .CreateApplicationBuilder(args)
+    //.AddHostedService<SendChannelWorker>()
     .AddHostedService<SendWorker>()
     .Build()
     .RunAsync();
diff --git a/samples/Producing/SendChannelWorker.cs 
b/samples/Producing/SendChannelWorker.cs
new file mode 100644
index 0000000..bebf992
--- /dev/null
+++ b/samples/Producing/SendChannelWorker.cs
@@ -0,0 +1,62 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Producing;
+
+using DotPulsar;
+using DotPulsar.Extensions;
+using Extensions;
+
+public class SendChannelWorker : BackgroundService
+{
+    private readonly ILogger _logger;
+
+    public SendChannelWorker(ILogger<SendChannelWorker> logger) => _logger = 
logger;
+
+    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+    {
+        await using var client = PulsarClient.Builder()
+            .ExceptionHandler(_logger.PulsarClientException) // Optional
+            .Build();                                        // Connecting to 
pulsar://localhost:6650
+
+        await using var producer = client.NewProducer(Schema.String)
+            .StateChangedHandler(_logger.ProducerChangedState) // Optional
+            .Topic("persistent://public/default/mytopic")
+            .Create();
+
+        var sendChannel = producer.SendChannel;
+
+        var delay = TimeSpan.FromSeconds(5);
+
+        _logger.LogInformation($"Will start sending messages every 
{delay.TotalSeconds} seconds");
+
+        while (!stoppingToken.IsCancellationRequested)
+        {
+            var data = DateTime.UtcNow.ToLongTimeString();
+            await sendChannel.Send(data, id =>
+            {
+                _logger.LogInformation($"Received acknowledgement message with 
content: '{data}' and got message id: '{id}'");
+                return ValueTask.CompletedTask;
+            }, stoppingToken);
+            _logger.LogInformation($"Sent message with content: '{data}'");
+            await Task.Delay(delay, stoppingToken);
+        }
+        sendChannel.Complete();
+
+        // Wait up to 5 seconds for in flight messages to be delivered before 
closing
+        var shutdownCts = new CancellationTokenSource();
+        shutdownCts.CancelAfter(TimeSpan.FromSeconds(5));
+        await sendChannel.Completion(shutdownCts.Token);
+    }
+}
diff --git a/samples/SendChannel/Program.cs b/samples/SendChannel/Program.cs
deleted file mode 100644
index c27d3dd..0000000
--- a/samples/SendChannel/Program.cs
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using DotPulsar;
-using DotPulsar.Abstractions;
-using DotPulsar.Extensions;
-
-var cts = new CancellationTokenSource();
-
-Console.CancelKeyPress += (sender, args) =>
-{
-    cts.Cancel();
-    args.Cancel = true;
-};
-
-await using var client = PulsarClient
-    .Builder()
-    .ExceptionHandler(ExceptionHandler)
-    .Build(); // Connecting to pulsar://localhost:6650
-
-await using var producer = client.NewProducer(Schema.String)
-    .StateChangedHandler(Monitor)
-    .Topic("persistent://public/default/mytopic")
-    .Create();
-
-Console.WriteLine("Press Ctrl+C to exit");
-
-var sendChannel = producer.SendChannel;
-await ProduceMessages(sendChannel, 1000, cts.Token);
-sendChannel.Complete();
-
-var shutdownCts = new CancellationTokenSource();
-shutdownCts.CancelAfter(TimeSpan.FromSeconds(30));
-await sendChannel.Completion(shutdownCts.Token);
-
-async Task ProduceMessages(ISendChannel<string> sendChannel, int messages, 
CancellationToken cancellationToken)
-{
-    try
-    {
-        int i = 0;
-        while (++i <= messages && !cancellationToken.IsCancellationRequested)
-        {
-            var data = DateTime.UtcNow.ToLongTimeString();
-
-            await sendChannel.Send(data, id =>
-            {
-                Console.WriteLine($"Received acknowledgement for {id}");
-                return ValueTask.CompletedTask;
-            }, cancellationToken);
-
-            Console.WriteLine($"Sent: {data}");
-        }
-    }
-    catch (OperationCanceledException) // If not using the cancellationToken, 
then just dispose the producer and catch ObjectDisposedException instead
-    { }
-}
-
-void ExceptionHandler(ExceptionContext context) =>
-    Console.WriteLine($"The PulsarClient got an exception: 
{context.Exception}");
-
-void Monitor(ProducerStateChanged stateChanged) =>
-    Console.WriteLine($"The producer for topic '{stateChanged.Producer.Topic}' 
changed state to '{stateChanged.ProducerState}'");
diff --git a/samples/SendChannel/SendChannel.csproj 
b/samples/SendChannel/SendChannel.csproj
deleted file mode 100644
index 1fd8f8f..0000000
--- a/samples/SendChannel/SendChannel.csproj
+++ /dev/null
@@ -1,13 +0,0 @@
-<Project Sdk="Microsoft.NET.Sdk">
-
-  <PropertyGroup>
-    <OutputType>Exe</OutputType>
-    <TargetFramework>net9.0</TargetFramework>
-    <ImplicitUsings>enable</ImplicitUsings>
-  </PropertyGroup>
-
-  <ItemGroup>
-    <ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
-  </ItemGroup>
-
-</Project>

Reply via email to