This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 28e57ca  Updated changelog for SendChannel (#131)
28e57ca is described below

commit 28e57ca4b1c0de4c7e352eee953a124c4ae46ffc
Author: Kristian Andersen <[email protected]>
AuthorDate: Fri Jan 13 12:37:39 2023 +0100

    Updated changelog for SendChannel (#131)
    
    Minor cleanup
---
 CHANGELOG.md                                |  3 +++
 src/DotPulsar/Internal/Connection.cs        |  2 +-
 src/DotPulsar/Internal/ConnectionPool.cs    |  2 +-
 src/DotPulsar/Internal/Producer.cs          | 10 +++++-----
 tests/DotPulsar.Tests/IntegrationFixture.cs |  2 +-
 5 files changed, 11 insertions(+), 8 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2a5740e..c4b9f00 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,9 @@ The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/)
 ### Added
 
 - Subscription properties can be added when creating a consumer
+- Support ordered asynchronous `Send(...)` through a `SendChannel` for use 
cases that require very high throughput.
+  The `SendChannel` is accessible through the `IProducer<TMessage>` interface 
and allows the user to block
+  future `Send(...)` calls with `Complete()` and awaiting outstanding send 
operations with `await Completion()`.
 
 ## [2.7.0] - 2022-12-08
 
diff --git a/src/DotPulsar/Internal/Connection.cs 
b/src/DotPulsar/Internal/Connection.cs
index ff14a13..84bf300 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -297,7 +297,7 @@ public sealed class Connection : IConnection
         }
     }
 
-    public async Task ProcessIncommingFrames(CancellationToken 
cancellationToken)
+    public async Task ProcessIncomingFrames(CancellationToken 
cancellationToken)
     {
         await Task.Yield();
 
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs 
b/src/DotPulsar/Internal/ConnectionPool.cs
index 62fb53d..59647b6 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -162,7 +162,7 @@ public sealed class ConnectionPool : IConnectionPool
         var connection = new Connection(new PulsarStream(stream), 
_keepAliveInterval, _authentication);
         DotPulsarMeter.ConnectionCreated();
         _connections[url] = connection;
-        _ = 
connection.ProcessIncommingFrames(_cancellationTokenSource.Token).ContinueWith(t
 => DisposeConnection(url));
+        _ = 
connection.ProcessIncomingFrames(_cancellationTokenSource.Token).ContinueWith(t 
=> DisposeConnection(url));
         var commandConnect = _commandConnect;
 
         if (url.ProxyThroughServiceUrl)
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index 0b40167..824b523 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -64,14 +64,14 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
         _operationName = $"{options.Topic} send";
         _activityTags = new KeyValuePair<string, object?>[]
         {
-                new KeyValuePair<string, object?>("messaging.destination", 
options.Topic),
-                new KeyValuePair<string, 
object?>("messaging.destination_kind", "topic"),
-                new KeyValuePair<string, object?>("messaging.system", 
"pulsar"),
-                new KeyValuePair<string, object?>("messaging.url", serviceUrl),
+                new ("messaging.destination", options.Topic),
+                new ("messaging.destination_kind", "topic"),
+                new ("messaging.system", "pulsar"),
+                new ("messaging.url", serviceUrl),
         };
         _meterTags = new KeyValuePair<string, object?>[]
         {
-                new KeyValuePair<string, object?>("topic", options.Topic)
+                new ("topic", options.Topic)
         };
         _attachTraceInfoToMessages = options.AttachTraceInfoToMessages;
         _sequenceId = new SequenceId(options.InitialSequenceId);
diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs 
b/tests/DotPulsar.Tests/IntegrationFixture.cs
index 234fa57..d8f786d 100644
--- a/tests/DotPulsar.Tests/IntegrationFixture.cs
+++ b/tests/DotPulsar.Tests/IntegrationFixture.cs
@@ -62,7 +62,7 @@ public class IntegrationFixture : IAsyncLifetime
 
         _cluster = new Builder()
             .UseContainer()
-            .UseImage("apachepulsar/pulsar:2.10.0")
+            .UseImage("apachepulsar/pulsar:2.10.3")
             .WithEnvironment(environmentVariables)
             .ExposePort(Port)
             .Command("/bin/bash -c", arguments)

Reply via email to