This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d9351e  ConnectionPool handles connecting through Pulsar proxies
7d9351e is described below

commit 7d9351eb27cf4c86958380b0a4c5aa538a8aebfc
Author: Vince Pergolizzi <[email protected]>
AuthorDate: Sat Mar 21 10:44:46 2020 -0400

    ConnectionPool handles connecting through Pulsar proxies
---
 src/DotPulsar/Internal/ConnectionPool.cs | 45 ++++++++++++++++++++------------
 1 file changed, 29 insertions(+), 16 deletions(-)

diff --git a/src/DotPulsar/Internal/ConnectionPool.cs 
b/src/DotPulsar/Internal/ConnectionPool.cs
index 7c1f7b5..de178c7 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -69,12 +69,14 @@ namespace DotPulsar.Internal
                 Authoritative = false
             };
 
-            var serviceUrl = _serviceUrl;
+            var logicalUrl = _serviceUrl;
+            var physicalUrl = _serviceUrl;
 
             while (true)
             {
-                var connection = await GetConnection(serviceUrl, 
cancellationToken);
+                var connection = await GetConnection(logicalUrl, physicalUrl, 
cancellationToken);
                 var response = await connection.Send(lookup, 
cancellationToken);
+
                 response.Expect(BaseCommand.Type.LookupResponse);
 
                 if (response.LookupTopicResponse.Response == 
CommandLookupTopicResponse.LookupType.Failed)
@@ -82,15 +84,12 @@ namespace DotPulsar.Internal
 
                 lookup.Authoritative = 
response.LookupTopicResponse.Authoritative;
 
-                serviceUrl = new 
Uri(GetBrokerServiceUrl(response.LookupTopicResponse));
+                logicalUrl = new 
Uri(GetBrokerServiceUrl(response.LookupTopicResponse));
 
                 if (response.LookupTopicResponse.Response == 
CommandLookupTopicResponse.LookupType.Redirect || 
!response.LookupTopicResponse.Authoritative)
                     continue;
 
-                if (_serviceUrl.IsLoopback) // LookupType is 'Connect', 
ServiceUrl is local and response is authoritative. Assume the Pulsar server is 
a standalone docker.
-                    return connection;
-                else
-                    return await GetConnection(serviceUrl, cancellationToken);
+                return await GetConnection(logicalUrl, physicalUrl, 
cancellationToken);
             }
         }
 
@@ -117,32 +116,46 @@ namespace DotPulsar.Internal
             }
         }
 
-        private async ValueTask<Connection> GetConnection(Uri serviceUrl, 
CancellationToken cancellationToken)
+        // The logical Url differs from the physical Url when you are
+        // connecting through a Pulsar proxy. We create 1 physical connection 
to
+        // the Proxy for each logical broker connection we require according to
+        // the topic lookup.
+        private async ValueTask<Connection> GetConnection(Uri logicalUrl, Uri 
physicalUrl, CancellationToken cancellationToken)
         {
             using (await _lock.Lock(cancellationToken))
             {
-                if (_connections.TryGetValue(serviceUrl, out Connection 
connection))
+                if (_connections.TryGetValue(logicalUrl, out Connection 
connection))
                     return connection;
 
-                return await EstablishNewConnection(serviceUrl, 
cancellationToken);
+                return await EstablishNewConnection(logicalUrl, physicalUrl, 
cancellationToken);
             }
         }
 
-        private async Task<Connection> EstablishNewConnection(Uri serviceUrl, 
CancellationToken cancellationToken)
+        private async Task<Connection> EstablishNewConnection(Uri logicalUrl, 
Uri physicalUrl, CancellationToken cancellationToken)
         {
-            var stream = await _connector.Connect(serviceUrl);
+            var stream = await _connector.Connect(physicalUrl);
             var connection = new Connection(new PulsarStream(stream));
             DotPulsarEventSource.Log.ConnectionCreated();
-            _connections[serviceUrl] = connection;
-            _ = 
connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => 
DisposeConnection(serviceUrl));
+            _connections[logicalUrl] = connection;
+            _ = 
connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => 
DisposeConnection(logicalUrl));
+
+            if (logicalUrl != physicalUrl)
+            {
+                // DirectProxyHandler expects the Url with no scheme provided
+                _commandConnect.ProxyToBrokerUrl = 
$"{logicalUrl.Host}:{logicalUrl.Port}";
+            }
+
             var response = await connection.Send(_commandConnect, 
cancellationToken);
             response.Expect(BaseCommand.Type.Connected);
+
+            _commandConnect.ResetProxyToBrokerUrl(); // reset so we can re-use 
this object
+
             return connection;
         }
 
-        private async ValueTask DisposeConnection(Uri serviceUrl)
+        private async ValueTask DisposeConnection(Uri logicalUrl)
         {
-            if (_connections.TryRemove(serviceUrl, out Connection connection))
+            if (_connections.TryRemove(logicalUrl, out var connection))
             {
                 await connection.DisposeAsync();
                 DotPulsarEventSource.Log.ConnectionDisposed();

Reply via email to