This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 28e57ca Updated changelog for SendChannel (#131)
28e57ca is described below
commit 28e57ca4b1c0de4c7e352eee953a124c4ae46ffc
Author: Kristian Andersen <[email protected]>
AuthorDate: Fri Jan 13 12:37:39 2023 +0100
Updated changelog for SendChannel (#131)
Minor cleanup
---
CHANGELOG.md | 3 +++
src/DotPulsar/Internal/Connection.cs | 2 +-
src/DotPulsar/Internal/ConnectionPool.cs | 2 +-
src/DotPulsar/Internal/Producer.cs | 10 +++++-----
tests/DotPulsar.Tests/IntegrationFixture.cs | 2 +-
5 files changed, 11 insertions(+), 8 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2a5740e..c4b9f00 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,9 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/)
### Added
- Subscription properties can be added when creating a consumer
+- Support ordered asynchronous `Send(...)` through a `SendChannel` for use
cases that require very high throughput.
+ The `SendChannel` is accessible through the `IProducer<TMessage>` interface
and allows the user to block
+ future `Send(...)` calls with `Complete()` and awaiting outstanding send
operations with `await Completion()`.
## [2.7.0] - 2022-12-08
diff --git a/src/DotPulsar/Internal/Connection.cs
b/src/DotPulsar/Internal/Connection.cs
index ff14a13..84bf300 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -297,7 +297,7 @@ public sealed class Connection : IConnection
}
}
- public async Task ProcessIncommingFrames(CancellationToken
cancellationToken)
+ public async Task ProcessIncomingFrames(CancellationToken
cancellationToken)
{
await Task.Yield();
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs
b/src/DotPulsar/Internal/ConnectionPool.cs
index 62fb53d..59647b6 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -162,7 +162,7 @@ public sealed class ConnectionPool : IConnectionPool
var connection = new Connection(new PulsarStream(stream),
_keepAliveInterval, _authentication);
DotPulsarMeter.ConnectionCreated();
_connections[url] = connection;
- _ =
connection.ProcessIncommingFrames(_cancellationTokenSource.Token).ContinueWith(t
=> DisposeConnection(url));
+ _ =
connection.ProcessIncomingFrames(_cancellationTokenSource.Token).ContinueWith(t
=> DisposeConnection(url));
var commandConnect = _commandConnect;
if (url.ProxyThroughServiceUrl)
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index 0b40167..824b523 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -64,14 +64,14 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
_operationName = $"{options.Topic} send";
_activityTags = new KeyValuePair<string, object?>[]
{
- new KeyValuePair<string, object?>("messaging.destination",
options.Topic),
- new KeyValuePair<string,
object?>("messaging.destination_kind", "topic"),
- new KeyValuePair<string, object?>("messaging.system",
"pulsar"),
- new KeyValuePair<string, object?>("messaging.url", serviceUrl),
+ new ("messaging.destination", options.Topic),
+ new ("messaging.destination_kind", "topic"),
+ new ("messaging.system", "pulsar"),
+ new ("messaging.url", serviceUrl),
};
_meterTags = new KeyValuePair<string, object?>[]
{
- new KeyValuePair<string, object?>("topic", options.Topic)
+ new ("topic", options.Topic)
};
_attachTraceInfoToMessages = options.AttachTraceInfoToMessages;
_sequenceId = new SequenceId(options.InitialSequenceId);
diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs
b/tests/DotPulsar.Tests/IntegrationFixture.cs
index 234fa57..d8f786d 100644
--- a/tests/DotPulsar.Tests/IntegrationFixture.cs
+++ b/tests/DotPulsar.Tests/IntegrationFixture.cs
@@ -62,7 +62,7 @@ public class IntegrationFixture : IAsyncLifetime
_cluster = new Builder()
.UseContainer()
- .UseImage("apachepulsar/pulsar:2.10.0")
+ .UseImage("apachepulsar/pulsar:2.10.3")
.WithEnvironment(environmentVariables)
.ExposePort(Port)
.Command("/bin/bash -c", arguments)