This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push: new c17043f Revert support for proxies and make ready for 0.8.4 c17043f is described below commit c17043f2a4d7000a68a313f0ca96b5c69625f3c9 Author: Daniel Blankensteiner <d...@vmail.dk> AuthorDate: Wed Mar 25 23:23:19 2020 +0100 Revert support for proxies and make ready for 0.8.4 --- .gitignore | 3 --- src/DotPulsar/DotPulsar.csproj | 4 +-- src/DotPulsar/Internal/ConnectionPool.cs | 45 ++++++++++++-------------------- 3 files changed, 18 insertions(+), 34 deletions(-) diff --git a/.gitignore b/.gitignore index 4590ca5..35c74e6 100644 --- a/.gitignore +++ b/.gitignore @@ -287,6 +287,3 @@ Icon Network Trash Folder Temporary Items .apdisk - -### Others ### -FodyWeavers.xsd \ No newline at end of file diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj index bb74643..9b9211b 100644 --- a/src/DotPulsar/DotPulsar.csproj +++ b/src/DotPulsar/DotPulsar.csproj @@ -2,7 +2,7 @@ <PropertyGroup> <TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks> - <Version>0.8.3</Version> + <Version>0.8.4</Version> <AssemblyVersion>$(Version)</AssemblyVersion> <FileVersion>$(Version)</FileVersion> <Authors>DanskeCommodities;dblank</Authors> @@ -11,7 +11,7 @@ <Title>DotPulsar</Title> <PackageTags>Apache;Pulsar</PackageTags> <PackageLicenseExpression>Apache-2.0</PackageLicenseExpression> - <PackageReleaseNotes>Beta release - Support connecting via a proxy and a minor bug fix</PackageReleaseNotes> + <PackageReleaseNotes>Beta release - Revert support for proxies. Will be added again later</PackageReleaseNotes> <Description>.NET/C# client library for Apache Pulsar</Description> <GenerateDocumentationFile>true</GenerateDocumentationFile> <PublishRepositoryUrl>true</PublishRepositoryUrl> diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs index c4159de..8e5bc8c 100644 --- a/src/DotPulsar/Internal/ConnectionPool.cs +++ b/src/DotPulsar/Internal/ConnectionPool.cs @@ -69,14 +69,12 @@ namespace DotPulsar.Internal Authoritative = false }; - var logicalUrl = _serviceUrl; - var physicalUrl = _serviceUrl; + var serviceUrl = _serviceUrl; while (true) { - var connection = await GetConnection(logicalUrl, physicalUrl, cancellationToken).ConfigureAwait(false); + var connection = await GetConnection(serviceUrl, cancellationToken).ConfigureAwait(false); var response = await connection.Send(lookup, cancellationToken).ConfigureAwait(false); - response.Expect(BaseCommand.Type.LookupResponse); if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Failed) @@ -84,12 +82,15 @@ namespace DotPulsar.Internal lookup.Authoritative = response.LookupTopicResponse.Authoritative; - logicalUrl = new Uri(GetBrokerServiceUrl(response.LookupTopicResponse)); + serviceUrl = new Uri(GetBrokerServiceUrl(response.LookupTopicResponse)); if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Redirect || !response.LookupTopicResponse.Authoritative) continue; - return await GetConnection(logicalUrl, physicalUrl, cancellationToken).ConfigureAwait(false); + if (_serviceUrl.IsLoopback) // LookupType is 'Connect', ServiceUrl is local and response is authoritative. Assume the Pulsar server is a standalone docker. + return connection; + else + return await GetConnection(serviceUrl, cancellationToken).ConfigureAwait(false); } } @@ -116,46 +117,32 @@ namespace DotPulsar.Internal } } - // The logical Url differs from the physical Url when you are - // connecting through a Pulsar proxy. We create 1 physical connection to - // the Proxy for each logical broker connection we require according to - // the topic lookup. - private async ValueTask<Connection> GetConnection(Uri logicalUrl, Uri physicalUrl, CancellationToken cancellationToken) + private async ValueTask<Connection> GetConnection(Uri serviceUrl, CancellationToken cancellationToken) { using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { - if (_connections.TryGetValue(logicalUrl, out Connection connection)) + if (_connections.TryGetValue(serviceUrl, out Connection connection)) return connection; - return await EstablishNewConnection(logicalUrl, physicalUrl, cancellationToken).ConfigureAwait(false); + return await EstablishNewConnection(serviceUrl, cancellationToken).ConfigureAwait(false); } } - private async Task<Connection> EstablishNewConnection(Uri logicalUrl, Uri physicalUrl, CancellationToken cancellationToken) + private async Task<Connection> EstablishNewConnection(Uri serviceUrl, CancellationToken cancellationToken) { - var stream = await _connector.Connect(physicalUrl).ConfigureAwait(false); + var stream = await _connector.Connect(serviceUrl).ConfigureAwait(false); var connection = new Connection(new PulsarStream(stream)); DotPulsarEventSource.Log.ConnectionCreated(); - _connections[logicalUrl] = connection; - _ = connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => DisposeConnection(logicalUrl)); - - if (logicalUrl != physicalUrl) - { - // DirectProxyHandler expects the Url with no scheme provided - _commandConnect.ProxyToBrokerUrl = $"{logicalUrl.Host}:{logicalUrl.Port}"; - } - + _connections[serviceUrl] = connection; + _ = connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => DisposeConnection(serviceUrl)); var response = await connection.Send(_commandConnect, cancellationToken).ConfigureAwait(false); response.Expect(BaseCommand.Type.Connected); - - _commandConnect.ResetProxyToBrokerUrl(); // reset so we can re-use this object - return connection; } - private async ValueTask DisposeConnection(Uri logicalUrl) + private async ValueTask DisposeConnection(Uri serviceUrl) { - if (_connections.TryRemove(logicalUrl, out var connection)) + if (_connections.TryRemove(serviceUrl, out Connection connection)) { await connection.DisposeAsync().ConfigureAwait(false); DotPulsarEventSource.Log.ConnectionDisposed();