This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new be4cb70 Move send channel sample to "Producing" project (#251)
be4cb70 is described below
commit be4cb7031586f7466e933b5b6e19c4e35bbdfdeb
Author: Kristian Andersen <[email protected]>
AuthorDate: Fri Feb 7 10:29:19 2025 +0100
Move send channel sample to "Producing" project (#251)
---
DotPulsar.sln | 7 ----
samples/Producing/Program.cs | 1 +
samples/Producing/SendChannelWorker.cs | 62 +++++++++++++++++++++++++++++
samples/SendChannel/Program.cs | 73 ----------------------------------
samples/SendChannel/SendChannel.csproj | 13 ------
5 files changed, 63 insertions(+), 93 deletions(-)
diff --git a/DotPulsar.sln b/DotPulsar.sln
index 834d99e..0a6377b 100644
--- a/DotPulsar.sln
+++ b/DotPulsar.sln
@@ -28,8 +28,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") =
"Compression", "benchmarks\C
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmarks",
"Benchmarks", "{2C57AF4B-0D23-42D7-86FE-80277FD52875}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SendChannel",
"samples\SendChannel\SendChannel.csproj",
"{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}"
-EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Extensions",
"samples\Extensions\Extensions.csproj", "{F211960B-F2C6-4878-B1AD-72D63CDB3B7A}"
EndProject
Global
@@ -62,10 +60,6 @@ Global
{040F8253-074D-4977-BDB1-0D9798B52CE2}.Debug|Any CPU.Build.0 =
Debug|Any CPU
{040F8253-074D-4977-BDB1-0D9798B52CE2}.Release|Any
CPU.ActiveCfg = Release|Any CPU
{040F8253-074D-4977-BDB1-0D9798B52CE2}.Release|Any CPU.Build.0
= Release|Any CPU
- {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
- {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.Build.0 =
Debug|Any CPU
- {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any
CPU.ActiveCfg = Release|Any CPU
- {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any CPU.Build.0
= Release|Any CPU
{F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
{F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Debug|Any CPU.Build.0 =
Debug|Any CPU
{F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Release|Any
CPU.ActiveCfg = Release|Any CPU
@@ -80,7 +74,6 @@ Global
{14934BED-A222-47B2-A58A-CFC4AAB89B49} =
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
{6D44683B-865C-4D15-9F0A-1A8441354589} =
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
{040F8253-074D-4977-BDB1-0D9798B52CE2} =
{2C57AF4B-0D23-42D7-86FE-80277FD52875}
- {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2} =
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
{F211960B-F2C6-4878-B1AD-72D63CDB3B7A} =
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 276ed24..a3f20c8 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -17,6 +17,7 @@ using Producing;
await Host
.CreateApplicationBuilder(args)
+ //.AddHostedService<SendChannelWorker>()
.AddHostedService<SendWorker>()
.Build()
.RunAsync();
diff --git a/samples/Producing/SendChannelWorker.cs
b/samples/Producing/SendChannelWorker.cs
new file mode 100644
index 0000000..bebf992
--- /dev/null
+++ b/samples/Producing/SendChannelWorker.cs
@@ -0,0 +1,62 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Producing;
+
+using DotPulsar;
+using DotPulsar.Extensions;
+using Extensions;
+
+public class SendChannelWorker : BackgroundService
+{
+ private readonly ILogger _logger;
+
+ public SendChannelWorker(ILogger<SendChannelWorker> logger) => _logger =
logger;
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ await using var client = PulsarClient.Builder()
+ .ExceptionHandler(_logger.PulsarClientException) // Optional
+ .Build(); // Connecting to
pulsar://localhost:6650
+
+ await using var producer = client.NewProducer(Schema.String)
+ .StateChangedHandler(_logger.ProducerChangedState) // Optional
+ .Topic("persistent://public/default/mytopic")
+ .Create();
+
+ var sendChannel = producer.SendChannel;
+
+ var delay = TimeSpan.FromSeconds(5);
+
+ _logger.LogInformation($"Will start sending messages every
{delay.TotalSeconds} seconds");
+
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ var data = DateTime.UtcNow.ToLongTimeString();
+ await sendChannel.Send(data, id =>
+ {
+ _logger.LogInformation($"Received acknowledgement message with
content: '{data}' and got message id: '{id}'");
+ return ValueTask.CompletedTask;
+ }, stoppingToken);
+ _logger.LogInformation($"Sent message with content: '{data}'");
+ await Task.Delay(delay, stoppingToken);
+ }
+ sendChannel.Complete();
+
+ // Wait up to 5 seconds for in flight messages to be delivered before
closing
+ var shutdownCts = new CancellationTokenSource();
+ shutdownCts.CancelAfter(TimeSpan.FromSeconds(5));
+ await sendChannel.Completion(shutdownCts.Token);
+ }
+}
diff --git a/samples/SendChannel/Program.cs b/samples/SendChannel/Program.cs
deleted file mode 100644
index c27d3dd..0000000
--- a/samples/SendChannel/Program.cs
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using DotPulsar;
-using DotPulsar.Abstractions;
-using DotPulsar.Extensions;
-
-var cts = new CancellationTokenSource();
-
-Console.CancelKeyPress += (sender, args) =>
-{
- cts.Cancel();
- args.Cancel = true;
-};
-
-await using var client = PulsarClient
- .Builder()
- .ExceptionHandler(ExceptionHandler)
- .Build(); // Connecting to pulsar://localhost:6650
-
-await using var producer = client.NewProducer(Schema.String)
- .StateChangedHandler(Monitor)
- .Topic("persistent://public/default/mytopic")
- .Create();
-
-Console.WriteLine("Press Ctrl+C to exit");
-
-var sendChannel = producer.SendChannel;
-await ProduceMessages(sendChannel, 1000, cts.Token);
-sendChannel.Complete();
-
-var shutdownCts = new CancellationTokenSource();
-shutdownCts.CancelAfter(TimeSpan.FromSeconds(30));
-await sendChannel.Completion(shutdownCts.Token);
-
-async Task ProduceMessages(ISendChannel<string> sendChannel, int messages,
CancellationToken cancellationToken)
-{
- try
- {
- int i = 0;
- while (++i <= messages && !cancellationToken.IsCancellationRequested)
- {
- var data = DateTime.UtcNow.ToLongTimeString();
-
- await sendChannel.Send(data, id =>
- {
- Console.WriteLine($"Received acknowledgement for {id}");
- return ValueTask.CompletedTask;
- }, cancellationToken);
-
- Console.WriteLine($"Sent: {data}");
- }
- }
- catch (OperationCanceledException) // If not using the cancellationToken,
then just dispose the producer and catch ObjectDisposedException instead
- { }
-}
-
-void ExceptionHandler(ExceptionContext context) =>
- Console.WriteLine($"The PulsarClient got an exception:
{context.Exception}");
-
-void Monitor(ProducerStateChanged stateChanged) =>
- Console.WriteLine($"The producer for topic '{stateChanged.Producer.Topic}'
changed state to '{stateChanged.ProducerState}'");
diff --git a/samples/SendChannel/SendChannel.csproj
b/samples/SendChannel/SendChannel.csproj
deleted file mode 100644
index 1fd8f8f..0000000
--- a/samples/SendChannel/SendChannel.csproj
+++ /dev/null
@@ -1,13 +0,0 @@
-<Project Sdk="Microsoft.NET.Sdk">
-
- <PropertyGroup>
- <OutputType>Exe</OutputType>
- <TargetFramework>net9.0</TargetFramework>
- <ImplicitUsings>enable</ImplicitUsings>
- </PropertyGroup>
-
- <ItemGroup>
- <ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
- </ItemGroup>
-
-</Project>