This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3018d07 Make ready for release 3.0.1
3018d07 is described below
commit 3018d07603dbd76eb7efcac2fcfd44aec7990b2d
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Sep 15 10:36:21 2023 +0200
Make ready for release 3.0.1
---
CHANGELOG.md | 7 ++-
src/DotPulsar/DotPulsar.csproj | 2 +-
tests/DotPulsar.Tests/ConsumerTests.cs | 80 +++++++++++++++-------------------
tests/DotPulsar.Tests/ReaderTests.cs | 70 ++++++++++++-----------------
4 files changed, 65 insertions(+), 94 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2dd7a71..1247972 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,16 +4,15 @@ All notable changes to this project will be documented in
this file.
The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
-## [Unreleased]
+## [3.0.1] - 2023-09-15
### Changed
-- When calling GetLastMessageId(s) on a Reader or Consumer, it returns a
MessageId without the topic field if
- MessageId.Earliest is found.
+- When calling GetLastMessageId(s) on a Reader or Consumer, it returns a
MessageId without the topic field if MessageId.Earliest is found
### Fixed
-- Fixed issue with DotPulsar client not handling connection faults for
consumers and readers.
+- Fixed issue with the DotPulsar client not handling connection faults for
consumers and readers
## [3.0.0] - 2023-08-30
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 36c6bb4..0e8fbd3 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net6.0;net7.0</TargetFrameworks>
- <Version>3.0.0</Version>
+ <Version>3.0.1</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
<Authors>ApachePulsar,DanskeCommodities,dblank</Authors>
diff --git a/tests/DotPulsar.Tests/ConsumerTests.cs
b/tests/DotPulsar.Tests/ConsumerTests.cs
index 4c63a51..6460dbe 100644
--- a/tests/DotPulsar.Tests/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/ConsumerTests.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -208,26 +208,15 @@ public class ConsumerTests
{
//Arrange
var semaphoreSlim = new SemaphoreSlim(1);
- await using var
- client = PulsarClient.Builder().ExceptionHandler(context =>
- {
- semaphoreSlim.WaitAsync();
- context.Result = FaultAction.Rethrow;
- context.ExceptionHandled = true;
- })
- .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a
cluster that does not exists.
- .Build();
-
- await using var consumer = client.NewConsumer(Schema.String)
- .StateChangedHandler(changed =>
- {
- var topic = changed.Consumer.Topic;
- var state = changed.ConsumerState;
- _testOutputHelper.WriteLine($"The consumer for topic '{topic}'
changed state to '{state}'");
- })
- .SubscriptionName("MySubscription")
- .Topic("persistent://public/default/mytopic")
- .Create();
+ await using var client =
PulsarClient.Builder().ExceptionHandler(context =>
+ {
+ semaphoreSlim.WaitAsync();
+ context.Result = FaultAction.Rethrow;
+ context.ExceptionHandled = true;
+ })
+ .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
+
+ await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, "persistent://a/b/c", "cn", "sn");
var receiveTask = consumer.Receive().AsTask();
semaphoreSlim.Release();
@@ -243,29 +232,16 @@ public class ConsumerTests
public async Task
Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowConsumerFaultedException()
{
//Arrange
- var cts = new CancellationTokenSource();
-
- await using var
- client = PulsarClient.Builder().ExceptionHandler(context =>
- {
- context.Result = FaultAction.Rethrow;
- context.ExceptionHandled = true;
- })
- .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a
cluster that does not exists.
- .Build();
-
- await using var consumer = client.NewConsumer(Schema.String)
- .StateChangedHandler(changed =>
- {
- var topic = changed.Consumer.Topic;
- var state = changed.ConsumerState;
- _testOutputHelper.WriteLine($"The consumer for topic '{topic}'
changed state to '{state}'");
- })
- .SubscriptionName("MySubscription")
- .Topic("persistent://public/default/mytopic")
- .Create();
+ await using var client =
PulsarClient.Builder().ExceptionHandler(context =>
+ {
+ context.Result = FaultAction.Rethrow;
+ context.ExceptionHandled = true;
+ })
+ .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
+
+ await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, "persistent://a/b/c", "cn", "sn");
- await consumer.OnStateChangeTo(ConsumerState.Faulted, cts.Token);
+ await consumer.OnStateChangeTo(ConsumerState.Faulted);
//Act
var exception = await Record.ExceptionAsync(() =>
consumer.Receive().AsTask());
@@ -305,18 +281,30 @@ public class ConsumerTests
return messageIds;
}
- private IProducer<string> CreateProducer(IPulsarClient pulsarClient,
string topicName) => pulsarClient.NewProducer(Schema.String)
+ private void LogState(ConsumerStateChanged stateChange)
+ => _testOutputHelper.WriteLine($"The consumer for topic
'{stateChange.Consumer.Topic}' changed state to '{stateChange.ConsumerState}'");
+
+ private void LogState(ProducerStateChanged stateChange)
+ => _testOutputHelper.WriteLine($"The producer for topic
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
+
+ private IProducer<string> CreateProducer(IPulsarClient pulsarClient,
string topicName)
+ => pulsarClient.NewProducer(Schema.String)
.Topic(topicName)
+ .StateChangedHandler(LogState)
.Create();
- private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient,
SubscriptionInitialPosition subscriptionInitialPosition,
+ private IConsumer<string> CreateConsumer(
+ IPulsarClient pulsarClient,
+ SubscriptionInitialPosition subscriptionInitialPosition,
string topicName,
string consumerName,
- string subscriptionName) => pulsarClient.NewConsumer(Schema.String)
+ string subscriptionName)
+ => pulsarClient.NewConsumer(Schema.String)
.ConsumerName(consumerName)
.InitialPosition(subscriptionInitialPosition)
.SubscriptionName(subscriptionName)
.Topic(topicName)
+ .StateChangedHandler(LogState)
.Create();
private IPulsarClient CreateClient()
diff --git a/tests/DotPulsar.Tests/ReaderTests.cs
b/tests/DotPulsar.Tests/ReaderTests.cs
index 6a595d6..a2c546f 100644
--- a/tests/DotPulsar.Tests/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/ReaderTests.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -210,26 +210,15 @@ public class ReaderTests
{
//Arrange
var semaphoreSlim = new SemaphoreSlim(1);
- await using var
- client = PulsarClient.Builder().ExceptionHandler(context =>
- {
- semaphoreSlim.WaitAsync();
- context.Result = FaultAction.Rethrow;
- context.ExceptionHandled = true;
- })
- .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a
cluster that does not exists.
- .Build();
-
- await using var reader = client.NewReader(Schema.String)
- .StartMessageId(MessageId.Earliest)
- .StateChangedHandler(changed =>
- {
- var topic = changed.Reader.Topic;
- var state = changed.ReaderState;
- _testOutputHelper.WriteLine($"The consumer for topic '{topic}'
changed state to '{state}'");
- })
- .Topic("persistent://public/default/mytopic")
- .Create();
+ await using var client =
PulsarClient.Builder().ExceptionHandler(context =>
+ {
+ semaphoreSlim.WaitAsync();
+ context.Result = FaultAction.Rethrow;
+ context.ExceptionHandled = true;
+ })
+ .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
+
+ await using var reader = CreateReader(client, MessageId.Earliest,
"persistent://a/b/c");
var receiveTask = reader.Receive().AsTask();
semaphoreSlim.Release();
@@ -245,29 +234,16 @@ public class ReaderTests
public async Task
Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowConsumerFaultedException()
{
//Arrange
- var cts = new CancellationTokenSource();
-
- await using var
- client = PulsarClient.Builder().ExceptionHandler(context =>
- {
- context.Result = FaultAction.Rethrow;
- context.ExceptionHandled = true;
- })
- .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a
cluster that does not exists.
- .Build();
-
- await using var reader = client.NewReader(Schema.String)
- .StartMessageId(MessageId.Earliest)
- .StateChangedHandler(changed =>
- {
- var topic = changed.Reader.Topic;
- var state = changed.ReaderState;
- _testOutputHelper.WriteLine($"The reader for topic '{topic}'
changed state to '{state}'");
- })
- .Topic("persistent://public/default/mytopic")
- .Create();
+ await using var client =
PulsarClient.Builder().ExceptionHandler(context =>
+ {
+ context.Result = FaultAction.Rethrow;
+ context.ExceptionHandled = true;
+ })
+ .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
+
+ await using var reader = CreateReader(client, MessageId.Earliest,
"persistent://a/b/c");
- await reader.OnStateChangeTo(ReaderState.Faulted, cts.Token);
+ await reader.OnStateChangeTo(ReaderState.Faulted);
//Act
var exception = await Record.ExceptionAsync(() =>
reader.Receive().AsTask());
@@ -276,13 +252,21 @@ public class ReaderTests
exception.Should().BeOfType<ReaderFaultedException>();
}
+ private void LogState(ReaderStateChanged stateChange)
+ => _testOutputHelper.WriteLine($"The reader for topic
'{stateChange.Reader.Topic}' changed state to '{stateChange.ReaderState}'");
+
+ private void LogState(ProducerStateChanged stateChange)
+ => _testOutputHelper.WriteLine($"The producer for topic
'{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
+
private IProducer<String> CreateProducer(IPulsarClient pulsarClient,
string topicName) => pulsarClient.NewProducer(Schema.String)
.Topic(topicName)
+ .StateChangedHandler(LogState)
.Create();
private IReader<String> CreateReader(IPulsarClient pulsarClient, MessageId
messageId, string topicName) => pulsarClient.NewReader(Schema.String)
.StartMessageId(messageId)
.Topic(topicName)
+ .StateChangedHandler(LogState)
.Create();
private IPulsarClient CreateClient()