This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9f76862 Prettyfied the samples
9f76862 is described below
commit 9f76862485cdd187aa545df6ebfdc82739797653
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Mon Oct 28 14:34:34 2024 +0100
Prettyfied the samples
---
samples/Consuming/Consuming.csproj | 1 +
samples/Consuming/Program.cs | 67 +++++++++++--------------
samples/Producing/Producing.csproj | 1 +
samples/Producing/Program.cs | 71 ++++++++++++---------------
samples/Reading/Program.cs | 65 +++++++++++--------------
samples/Reading/Reading.csproj | 1 +
samples/SendChannel/Program.cs | 89 +++++++++++++++-------------------
samples/SendChannel/SendChannel.csproj | 9 ++--
8 files changed, 132 insertions(+), 172 deletions(-)
diff --git a/samples/Consuming/Consuming.csproj
b/samples/Consuming/Consuming.csproj
index 2e5abec..efeb337 100644
--- a/samples/Consuming/Consuming.csproj
+++ b/samples/Consuming/Consuming.csproj
@@ -3,6 +3,7 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
+ <ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
<ItemGroup>
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index dd71b35..18a1c83 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -12,57 +12,46 @@
* limitations under the License.
*/
-namespace Consuming;
-
using DotPulsar;
using DotPulsar.Abstractions;
using DotPulsar.Extensions;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-internal static class Program
-{
- private static async Task Main()
- {
- var cts = new CancellationTokenSource();
+var cts = new CancellationTokenSource();
- Console.CancelKeyPress += (sender, args) =>
- {
- cts.Cancel();
- args.Cancel = true;
- };
+Console.CancelKeyPress += (sender, args) =>
+{
+ cts.Cancel();
+ args.Cancel = true;
+};
- await using var client = PulsarClient.Builder().Build(); // Connecting
to pulsar://localhost:6650
+await using var client = PulsarClient.Builder().Build(); // Connecting to
pulsar://localhost:6650
- await using var consumer = client.NewConsumer(Schema.String)
- .StateChangedHandler(Monitor)
- .SubscriptionName("MySubscription")
- .Topic("persistent://public/default/mytopic")
- .Create();
+await using var consumer = client.NewConsumer(Schema.String)
+ .StateChangedHandler(Monitor)
+ .SubscriptionName("MySubscription")
+ .Topic("persistent://public/default/mytopic")
+ .Create();
- Console.WriteLine("Press Ctrl+C to exit");
+Console.WriteLine("Press Ctrl+C to exit");
- await ConsumeMessages(consumer, cts.Token);
- }
+await ConsumeMessages(consumer, cts.Token);
- private static async Task ConsumeMessages(IConsumer<string> consumer,
CancellationToken cancellationToken)
+async Task ConsumeMessages(IConsumer<string> consumer, CancellationToken
cancellationToken)
+{
+ try
{
- try
+ await foreach (var message in consumer.Messages(cancellationToken))
{
- await foreach (var message in consumer.Messages(cancellationToken))
- {
- Console.WriteLine($"Received: {message.Value()}");
- await consumer.Acknowledge(message, cancellationToken);
- }
+ Console.WriteLine($"Received: {message.Value()}");
+ await consumer.Acknowledge(message, cancellationToken);
}
- catch (OperationCanceledException) { }
}
+ catch (OperationCanceledException) { }
+}
- private static void Monitor(ConsumerStateChanged stateChanged)
- {
- var topic = stateChanged.Consumer.Topic;
- var state = stateChanged.ConsumerState;
- Console.WriteLine($"The consumer for topic '{topic}' changed state to
'{state}'");
- }
+void Monitor(ConsumerStateChanged stateChanged)
+{
+ var topic = stateChanged.Consumer.Topic;
+ var state = stateChanged.ConsumerState;
+ Console.WriteLine($"The consumer for topic '{topic}' changed state to
'{state}'");
}
diff --git a/samples/Producing/Producing.csproj
b/samples/Producing/Producing.csproj
index 2e5abec..efeb337 100644
--- a/samples/Producing/Producing.csproj
+++ b/samples/Producing/Producing.csproj
@@ -3,6 +3,7 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
+ <ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
<ItemGroup>
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 8416c69..7339b77 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -12,61 +12,50 @@
* limitations under the License.
*/
-namespace Producing;
-
using DotPulsar;
using DotPulsar.Abstractions;
using DotPulsar.Extensions;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-internal static class Program
+var cts = new CancellationTokenSource();
+
+Console.CancelKeyPress += (sender, args) =>
{
- private static async Task Main()
- {
- var cts = new CancellationTokenSource();
+ cts.Cancel();
+ args.Cancel = true;
+};
- Console.CancelKeyPress += (sender, args) =>
- {
- cts.Cancel();
- args.Cancel = true;
- };
+await using var client = PulsarClient.Builder().Build(); // Connecting to
pulsar://localhost:6650
- await using var client = PulsarClient.Builder().Build(); // Connecting
to pulsar://localhost:6650
+await using var producer = client.NewProducer(Schema.String)
+ .StateChangedHandler(Monitor)
+ .Topic("persistent://public/default/mytopic")
+ .Create();
- await using var producer = client.NewProducer(Schema.String)
- .StateChangedHandler(Monitor)
- .Topic("persistent://public/default/mytopic")
- .Create();
+Console.WriteLine("Press Ctrl+C to exit");
- Console.WriteLine("Press Ctrl+C to exit");
+await ProduceMessages(producer, cts.Token);
- await ProduceMessages(producer, cts.Token);
- }
+async Task ProduceMessages(IProducer<string> producer, CancellationToken
cancellationToken)
+{
+ var delay = TimeSpan.FromSeconds(5);
- private static async Task ProduceMessages(IProducer<string> producer,
CancellationToken cancellationToken)
+ try
{
- var delay = TimeSpan.FromSeconds(5);
-
- try
+ while (!cancellationToken.IsCancellationRequested)
{
- while (!cancellationToken.IsCancellationRequested)
- {
- var data = DateTime.UtcNow.ToLongTimeString();
- _ = await producer.Send(data, cancellationToken);
- Console.WriteLine($"Sent: {data}");
- await Task.Delay(delay, cancellationToken);
- }
+ var data = DateTime.UtcNow.ToLongTimeString();
+ _ = await producer.Send(data, cancellationToken);
+ Console.WriteLine($"Sent: {data}");
+ await Task.Delay(delay, cancellationToken);
}
- catch (OperationCanceledException) // If not using the
cancellationToken, then just dispose the producer and catch
ObjectDisposedException instead
- { }
}
+ catch (OperationCanceledException) // If not using the cancellationToken,
then just dispose the producer and catch ObjectDisposedException instead
+ { }
+}
- private static void Monitor(ProducerStateChanged stateChanged)
- {
- var topic = stateChanged.Producer.Topic;
- var state = stateChanged.ProducerState;
- Console.WriteLine($"The producer for topic '{topic}' changed state to
'{state}'");
- }
+void Monitor(ProducerStateChanged stateChanged)
+{
+ var topic = stateChanged.Producer.Topic;
+ var state = stateChanged.ProducerState;
+ Console.WriteLine($"The producer for topic '{topic}' changed state to
'{state}'");
}
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index a0fcd61..e73f3a3 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -12,56 +12,45 @@
* limitations under the License.
*/
-namespace Reading;
-
using DotPulsar;
using DotPulsar.Abstractions;
using DotPulsar.Extensions;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-internal static class Program
-{
- private static async Task Main()
- {
- var cts = new CancellationTokenSource();
+var cts = new CancellationTokenSource();
- Console.CancelKeyPress += (sender, args) =>
- {
- cts.Cancel();
- args.Cancel = true;
- };
+Console.CancelKeyPress += (sender, args) =>
+{
+ cts.Cancel();
+ args.Cancel = true;
+};
- await using var client = PulsarClient.Builder().Build(); // Connecting
to pulsar://localhost:6650
+await using var client = PulsarClient.Builder().Build(); // Connecting to
pulsar://localhost:6650
- await using var reader = client.NewReader(Schema.String)
- .StartMessageId(MessageId.Earliest)
- .StateChangedHandler(Monitor)
- .Topic("persistent://public/default/mytopic")
- .Create();
+await using var reader = client.NewReader(Schema.String)
+ .StartMessageId(MessageId.Earliest)
+ .StateChangedHandler(Monitor)
+ .Topic("persistent://public/default/mytopic")
+ .Create();
- Console.WriteLine("Press Ctrl+C to exit");
+Console.WriteLine("Press Ctrl+C to exit");
- await ReadMessages(reader, cts.Token);
- }
+await ReadMessages(reader, cts.Token);
- private static async Task ReadMessages(IReader<string> reader,
CancellationToken cancellationToken)
+async Task ReadMessages(IReader<string> reader, CancellationToken
cancellationToken)
+{
+ try
{
- try
+ await foreach (var message in reader.Messages(cancellationToken))
{
- await foreach (var message in reader.Messages(cancellationToken))
- {
- Console.WriteLine($"Received: {message.Value()}");
- }
+ Console.WriteLine($"Received: {message.Value()}");
}
- catch (OperationCanceledException) { }
}
+ catch (OperationCanceledException) { }
+}
- private static void Monitor(ReaderStateChanged stateChanged)
- {
- var topic = stateChanged.Reader.Topic;
- var state = stateChanged.ReaderState;
- Console.WriteLine($"The reader for topic '{topic}' changed state to
'{state}'");
- }
+void Monitor(ReaderStateChanged stateChanged)
+{
+ var topic = stateChanged.Reader.Topic;
+ var state = stateChanged.ReaderState;
+ Console.WriteLine($"The reader for topic '{topic}' changed state to
'{state}'");
}
diff --git a/samples/Reading/Reading.csproj b/samples/Reading/Reading.csproj
index 2e5abec..efeb337 100644
--- a/samples/Reading/Reading.csproj
+++ b/samples/Reading/Reading.csproj
@@ -3,6 +3,7 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
+ <ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
<ItemGroup>
diff --git a/samples/SendChannel/Program.cs b/samples/SendChannel/Program.cs
index d79f038..f498366 100644
--- a/samples/SendChannel/Program.cs
+++ b/samples/SendChannel/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -12,71 +12,60 @@
* limitations under the License.
*/
-namespace SendChannel;
-
using DotPulsar;
using DotPulsar.Abstractions;
using DotPulsar.Extensions;
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-internal static class Program
-{
- private static async Task Main()
- {
- var cts = new CancellationTokenSource();
+var cts = new CancellationTokenSource();
- Console.CancelKeyPress += (sender, args) =>
- {
- cts.Cancel();
- args.Cancel = true;
- };
+Console.CancelKeyPress += (sender, args) =>
+{
+ cts.Cancel();
+ args.Cancel = true;
+};
- await using var client = PulsarClient.Builder().Build(); // Connecting
to pulsar://localhost:6650
+await using var client = PulsarClient.Builder().Build(); // Connecting to
pulsar://localhost:6650
- await using var producer = client.NewProducer(Schema.String)
- .StateChangedHandler(Monitor)
- .Topic("persistent://public/default/mytopic")
- .Create();
+await using var producer = client.NewProducer(Schema.String)
+ .StateChangedHandler(Monitor)
+ .Topic("persistent://public/default/mytopic")
+ .Create();
- Console.WriteLine("Press Ctrl+C to exit");
+Console.WriteLine("Press Ctrl+C to exit");
- var sendChannel = producer.SendChannel;
- await ProduceMessages(sendChannel, 1000, cts.Token);
- sendChannel.Complete();
+var sendChannel = producer.SendChannel;
+await ProduceMessages(sendChannel, 1000, cts.Token);
+sendChannel.Complete();
- var shutdownCts = new CancellationTokenSource();
- shutdownCts.CancelAfter(TimeSpan.FromSeconds(30));
- await sendChannel.Completion(shutdownCts.Token);
- }
+var shutdownCts = new CancellationTokenSource();
+shutdownCts.CancelAfter(TimeSpan.FromSeconds(30));
+await sendChannel.Completion(shutdownCts.Token);
- private static async Task ProduceMessages(ISendChannel<string>
sendChannel, int messages, CancellationToken cancellationToken)
+async Task ProduceMessages(ISendChannel<string> sendChannel, int messages,
CancellationToken cancellationToken)
+{
+ try
{
- try
+ int i = 0;
+ while (++i <= messages && !cancellationToken.IsCancellationRequested)
{
- int i = 0;
- while (++i <= messages &&
!cancellationToken.IsCancellationRequested)
- {
- var data = DateTime.UtcNow.ToLongTimeString();
+ var data = DateTime.UtcNow.ToLongTimeString();
- await sendChannel.Send(data, id =>
- {
- Console.WriteLine($"Received acknowledgement for {id}");
- return ValueTask.CompletedTask;
- }, cancellationToken);
+ await sendChannel.Send(data, id =>
+ {
+ Console.WriteLine($"Received acknowledgement for {id}");
+ return ValueTask.CompletedTask;
+ }, cancellationToken);
- Console.WriteLine($"Sent: {data}");
- }
+ Console.WriteLine($"Sent: {data}");
}
- catch (OperationCanceledException) // If not using the
cancellationToken, then just dispose the producer and catch
ObjectDisposedException instead
- { }
}
+ catch (OperationCanceledException) // If not using the cancellationToken,
then just dispose the producer and catch ObjectDisposedException instead
+ { }
+}
- private static void Monitor(ProducerStateChanged stateChanged)
- {
- var topic = stateChanged.Producer.Topic;
- var state = stateChanged.ProducerState;
- Console.WriteLine($"The producer for topic '{topic}' changed state to
'{state}'");
- }
+void Monitor(ProducerStateChanged stateChanged)
+{
+ var topic = stateChanged.Producer.Topic;
+ var state = stateChanged.ProducerState;
+ Console.WriteLine($"The producer for topic '{topic}' changed state to
'{state}'");
}
diff --git a/samples/SendChannel/SendChannel.csproj
b/samples/SendChannel/SendChannel.csproj
index 9384afa..efeb337 100644
--- a/samples/SendChannel/SendChannel.csproj
+++ b/samples/SendChannel/SendChannel.csproj
@@ -1,12 +1,13 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
+ <ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
- <ItemGroup>
- <ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
- </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
+ </ItemGroup>
</Project>