This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7d9351e ConnectionPool handles connecting through Pulsar proxies
7d9351e is described below
commit 7d9351eb27cf4c86958380b0a4c5aa538a8aebfc
Author: Vince Pergolizzi <[email protected]>
AuthorDate: Sat Mar 21 10:44:46 2020 -0400
ConnectionPool handles connecting through Pulsar proxies
---
src/DotPulsar/Internal/ConnectionPool.cs | 45 ++++++++++++++++++++------------
1 file changed, 29 insertions(+), 16 deletions(-)
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs
b/src/DotPulsar/Internal/ConnectionPool.cs
index 7c1f7b5..de178c7 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -69,12 +69,14 @@ namespace DotPulsar.Internal
Authoritative = false
};
- var serviceUrl = _serviceUrl;
+ var logicalUrl = _serviceUrl;
+ var physicalUrl = _serviceUrl;
while (true)
{
- var connection = await GetConnection(serviceUrl,
cancellationToken);
+ var connection = await GetConnection(logicalUrl, physicalUrl,
cancellationToken);
var response = await connection.Send(lookup,
cancellationToken);
+
response.Expect(BaseCommand.Type.LookupResponse);
if (response.LookupTopicResponse.Response ==
CommandLookupTopicResponse.LookupType.Failed)
@@ -82,15 +84,12 @@ namespace DotPulsar.Internal
lookup.Authoritative =
response.LookupTopicResponse.Authoritative;
- serviceUrl = new
Uri(GetBrokerServiceUrl(response.LookupTopicResponse));
+ logicalUrl = new
Uri(GetBrokerServiceUrl(response.LookupTopicResponse));
if (response.LookupTopicResponse.Response ==
CommandLookupTopicResponse.LookupType.Redirect ||
!response.LookupTopicResponse.Authoritative)
continue;
- if (_serviceUrl.IsLoopback) // LookupType is 'Connect',
ServiceUrl is local and response is authoritative. Assume the Pulsar server is
a standalone docker.
- return connection;
- else
- return await GetConnection(serviceUrl, cancellationToken);
+ return await GetConnection(logicalUrl, physicalUrl,
cancellationToken);
}
}
@@ -117,32 +116,46 @@ namespace DotPulsar.Internal
}
}
- private async ValueTask<Connection> GetConnection(Uri serviceUrl,
CancellationToken cancellationToken)
+ // The logical Url differs from the physical Url when you are
+ // connecting through a Pulsar proxy. We create 1 physical connection
to
+ // the Proxy for each logical broker connection we require according to
+ // the topic lookup.
+ private async ValueTask<Connection> GetConnection(Uri logicalUrl, Uri
physicalUrl, CancellationToken cancellationToken)
{
using (await _lock.Lock(cancellationToken))
{
- if (_connections.TryGetValue(serviceUrl, out Connection
connection))
+ if (_connections.TryGetValue(logicalUrl, out Connection
connection))
return connection;
- return await EstablishNewConnection(serviceUrl,
cancellationToken);
+ return await EstablishNewConnection(logicalUrl, physicalUrl,
cancellationToken);
}
}
- private async Task<Connection> EstablishNewConnection(Uri serviceUrl,
CancellationToken cancellationToken)
+ private async Task<Connection> EstablishNewConnection(Uri logicalUrl,
Uri physicalUrl, CancellationToken cancellationToken)
{
- var stream = await _connector.Connect(serviceUrl);
+ var stream = await _connector.Connect(physicalUrl);
var connection = new Connection(new PulsarStream(stream));
DotPulsarEventSource.Log.ConnectionCreated();
- _connections[serviceUrl] = connection;
- _ =
connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t =>
DisposeConnection(serviceUrl));
+ _connections[logicalUrl] = connection;
+ _ =
connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t =>
DisposeConnection(logicalUrl));
+
+ if (logicalUrl != physicalUrl)
+ {
+ // DirectProxyHandler expects the Url with no scheme provided
+ _commandConnect.ProxyToBrokerUrl =
$"{logicalUrl.Host}:{logicalUrl.Port}";
+ }
+
var response = await connection.Send(_commandConnect,
cancellationToken);
response.Expect(BaseCommand.Type.Connected);
+
+ _commandConnect.ResetProxyToBrokerUrl(); // reset so we can re-use
this object
+
return connection;
}
- private async ValueTask DisposeConnection(Uri serviceUrl)
+ private async ValueTask DisposeConnection(Uri logicalUrl)
{
- if (_connections.TryRemove(serviceUrl, out Connection connection))
+ if (_connections.TryRemove(logicalUrl, out var connection))
{
await connection.DisposeAsync();
DotPulsarEventSource.Log.ConnectionDisposed();