This is an automated email from the ASF dual-hosted git repository. curth pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push: new a66d1e176 fix(csharp/test/Drivers/Databricks): Run token exchange in a background task (#3188) a66d1e176 is described below commit a66d1e176fb1ca116a13bffc1101a55593631b2c Author: Alex Guo <133057192+alexguo...@users.noreply.github.com> AuthorDate: Tue Jul 22 15:24:55 2025 -0700 fix(csharp/test/Drivers/Databricks): Run token exchange in a background task (#3188) ## Proposed Changes - Currently, before we send an HttpRequest, we await RenewTokenIfNeededAsync which blocks the HttpRequest on the token refresh - Make this run in a non-blocking background task, so that a token refresh is triggered but the request will continue using the existing token - When the background task completes, use the new token - This is to improve latency since requests don't wait on the token refresh - The risk is that there is a potential to use expired tokens if the background task doesn't complete in time, however this can be mitigated by setting TokenRenewLimit property to refresh X minutes before the token expires ## Testing `dotnet test --filter "FullyQualifiedName~TokenExchangeTests"` ``` [xUnit.net 00:00:00.00] xUnit.net VSTest Adapter v3.1.1+bf6400fd51 (64-bit .NET 8.0.7) [xUnit.net 00:00:00.04] Discovering: Apache.Arrow.Adbc.Tests.Drivers.Databricks [xUnit.net 00:00:00.12] Discovered: Apache.Arrow.Adbc.Tests.Drivers.Databricks [xUnit.net 00:00:00.13] Starting: Apache.Arrow.Adbc.Tests.Drivers.Databricks [xUnit.net 00:00:11.02] Apache.Arrow.Adbc.Tests.Drivers.Databricks.Auth.TokenExchangeTests.TokenExchangeHandler_WithValidTokenNotNearExpiry_UsesOriginalToken [SKIP] [xUnit.net 00:00:11.02] Access token is too close to expiration for this test [xUnit.net 00:00:11.34] Finished: Apache.Arrow.Adbc.Tests.Drivers.Databricks Apache.Arrow.Adbc.Tests.Drivers.Databricks test net8.0 succeeded (11.8s) Test summary: total: 3, failed: 0, succeeded: 2, skipped: 1, duration: 11.7s Build succeeded in 13.6s ``` `dotnet test --filter "FullyQualifiedName~TokenExchangeDelegatingHandlerTests"` ``` [xUnit.net 00:00:00.00] xUnit.net VSTest Adapter v3.1.1+bf6400fd51 (64-bit .NET 8.0.7) [xUnit.net 00:00:00.06] Discovering: Apache.Arrow.Adbc.Tests.Drivers.Databricks [xUnit.net 00:00:00.14] Discovered: Apache.Arrow.Adbc.Tests.Drivers.Databricks [xUnit.net 00:00:00.16] Starting: Apache.Arrow.Adbc.Tests.Drivers.Databricks [xUnit.net 00:00:01.76] Finished: Apache.Arrow.Adbc.Tests.Drivers.Databricks Apache.Arrow.Adbc.Tests.Drivers.Databricks test net8.0 succeeded (2.4s) Test summary: total: 14, failed: 0, succeeded: 14, skipped: 0, duration: 2.4s Build succeeded in 3.8s ``` --- .../Auth/TokenExchangeDelegatingHandler.cs | 77 +++++++++++----- .../Databricks/E2E/Auth/TokenExchangeTests.cs | 101 ++++++++++++++++++--- .../Auth/TokenExchangeDelegatingHandlerTests.cs | 73 +++++++++++++-- 3 files changed, 207 insertions(+), 44 deletions(-) diff --git a/csharp/src/Drivers/Databricks/Auth/TokenExchangeDelegatingHandler.cs b/csharp/src/Drivers/Databricks/Auth/TokenExchangeDelegatingHandler.cs index f24a9a90e..2748dc951 100644 --- a/csharp/src/Drivers/Databricks/Auth/TokenExchangeDelegatingHandler.cs +++ b/csharp/src/Drivers/Databricks/Auth/TokenExchangeDelegatingHandler.cs @@ -25,17 +25,19 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth { /// <summary> /// HTTP message handler that automatically refreshes OAuth tokens before they expire. + /// Uses a non-blocking approach to refresh tokens in the background. /// </summary> internal class TokenExchangeDelegatingHandler : DelegatingHandler { private readonly string _initialToken; private readonly int _tokenRenewLimitMinutes; - private readonly SemaphoreSlim _tokenLock = new SemaphoreSlim(1, 1); + private readonly object _tokenLock = new object(); private readonly ITokenExchangeClient _tokenExchangeClient; private string _currentToken; private DateTime _tokenExpiryTime; private bool _tokenExchangeAttempted = false; + private Task? _pendingTokenTask = null; /// <summary> /// Initializes a new instance of the <see cref="TokenExchangeDelegatingHandler"/> class. @@ -69,41 +71,54 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth // Only renew if: // 1. We haven't already attempted token exchange (a token can only be renewed once) // 2. The token will expire within the renewal limit + // 3. We don't already have a pending refresh task return !_tokenExchangeAttempted && - DateTime.UtcNow.AddMinutes(_tokenRenewLimitMinutes) >= _tokenExpiryTime; + DateTime.UtcNow.AddMinutes(_tokenRenewLimitMinutes) >= _tokenExpiryTime && + _pendingTokenTask == null; } /// <summary> - /// Renews the token if needed. + /// Starts token renewal in the background if needed. /// </summary> /// <param name="cancellationToken">A cancellation token.</param> - /// <returns>A task representing the asynchronous operation.</returns> - private async Task RenewTokenIfNeededAsync(CancellationToken cancellationToken) + private void StartTokenRenewalIfNeeded(CancellationToken cancellationToken) { if (!NeedsTokenRenewal()) { return; } - // Acquire the lock to ensure only one thread attempts renewal - await _tokenLock.WaitAsync(cancellationToken); - - try + bool needsRenewal; + lock (_tokenLock) { // Double-check pattern in case another thread renewed while we were waiting - if (!NeedsTokenRenewal()) + needsRenewal = NeedsTokenRenewal(); + if (needsRenewal) { - return; + // Mark that we've attempted token exchange to prevent multiple attempts + // Specifically, NeedsTokenRenewal checks this flag + _tokenExchangeAttempted = true; } + } + + if (!needsRenewal) + { + return; + } + // Start token refresh in the background + _pendingTokenTask = Task.Run(async () => + { try { - _tokenExchangeAttempted = true; - TokenExchangeResponse response = await _tokenExchangeClient.ExchangeTokenAsync(_initialToken, cancellationToken); - _currentToken = response.AccessToken; - _tokenExpiryTime = response.ExpiryTime; + // Update the token atomically when ready + lock (_tokenLock) + { + _currentToken = response.AccessToken; + _tokenExpiryTime = response.ExpiryTime; + } } catch (Exception ex) { @@ -111,11 +126,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth // This is to avoid interrupting the operation if token exchange fails System.Diagnostics.Debug.WriteLine($"Token exchange failed: {ex.Message}"); } - } - finally - { - _tokenLock.Release(); - } + }, cancellationToken); } /// <summary> @@ -126,8 +137,16 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth /// <returns>The HTTP response message.</returns> protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) { - await RenewTokenIfNeededAsync(cancellationToken); - request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _currentToken); + StartTokenRenewalIfNeeded(cancellationToken); + + // Use the current token (which might be the old one while refresh is in progress) + string tokenToUse; + lock (_tokenLock) + { + tokenToUse = _currentToken; + } + + request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", tokenToUse); return await base.SendAsync(request, cancellationToken); } @@ -135,7 +154,19 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth { if (disposing) { - _tokenLock.Dispose(); + // Wait for any pending token task to complete to avoid leaking tasks + if (_pendingTokenTask != null) + { + try + { + // Try to wait for the task to complete, but don't block indefinitely + _pendingTokenTask.Wait(TimeSpan.FromSeconds(10)); + } + catch (Exception ex) + { + System.Diagnostics.Debug.WriteLine($"Exception during token task cleanup: {ex.Message}"); + } + } } base.Dispose(disposing); diff --git a/csharp/test/Drivers/Databricks/E2E/Auth/TokenExchangeTests.cs b/csharp/test/Drivers/Databricks/E2E/Auth/TokenExchangeTests.cs index b2e988be1..c574abe0c 100644 --- a/csharp/test/Drivers/Databricks/E2E/Auth/TokenExchangeTests.cs +++ b/csharp/test/Drivers/Databricks/E2E/Auth/TokenExchangeTests.cs @@ -16,6 +16,7 @@ */ using System; +using System.Collections.Generic; using System.Net.Http; using System.Threading; using System.Threading.Tasks; @@ -25,6 +26,28 @@ using Xunit.Abstractions; namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks.Auth { + public class TokenCapturingHandler : DelegatingHandler + { + public List<string> CapturedTokens { get; } = new List<string>(); + public List<DateTime> RequestTimes { get; } = new List<DateTime>(); + + public TokenCapturingHandler(HttpMessageHandler innerHandler) : base(innerHandler) + { + } + + protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + // Capture the authorization token + if (request.Headers.Authorization != null) + { + CapturedTokens.Add(request.Headers.Authorization.Parameter ?? string.Empty); + RequestTimes.Add(DateTime.UtcNow); + } + + return await base.SendAsync(request, cancellationToken); + } + } + public class TokenExchangeTests : TestBase<DatabricksTestConfiguration, DatabricksTestEnvironment>, IDisposable { private readonly HttpClient _httpClient; @@ -80,7 +103,7 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks.Auth } [SkippableFact] - public async Task TokenExchangeHandler_WithValidToken_RefreshesToken() + public async Task TokenExchangeHandler_WithValidToken_RefreshesTokenInBackgroundAcrossRequests() { Skip.IfNot(!string.IsNullOrEmpty(TestConfiguration.AccessToken), "OAuth access token not configured"); @@ -93,8 +116,11 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks.Auth string host = GetHost(); var tokenExchangeClient = new TokenExchangeClient(_httpClient, host); + // Create a token capturing handler to intercept the actual tokens being sent + var tokenCapturingHandler = new TokenCapturingHandler(new HttpClientHandler()); + var handler = new TokenExchangeDelegatingHandler( - new HttpClientHandler(), + tokenCapturingHandler, tokenExchangeClient, TestConfiguration.AccessToken, nearFutureExpiry, @@ -102,14 +128,40 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks.Auth var httpClient = new HttpClient(handler); - var request = new HttpRequestMessage(HttpMethod.Get, $"https://{host}/api/2.0/sql/config/warehouses"); - var response = await httpClient.SendAsync(request, CancellationToken.None); + // First request - should trigger background token refresh but use original token + var firstRequest = new HttpRequestMessage(HttpMethod.Get, $"https://{host}/api/2.0/sql/config/warehouses"); + var startTime = DateTime.UtcNow; + var firstResponse = await httpClient.SendAsync(firstRequest, CancellationToken.None); + var firstRequestDuration = DateTime.UtcNow - startTime; + + // The first request should succeed quickly (not waiting for token refresh) + firstResponse.EnsureSuccessStatusCode(); + string firstContent = await firstResponse.Content.ReadAsStringAsync(); + Assert.Contains("sql_configuration_parameters", firstContent); + + // Verify the request completed quickly (token refresh happens in background) + Assert.True(firstRequestDuration < TimeSpan.FromSeconds(5), + $"First request took {firstRequestDuration.TotalMilliseconds}ms, which may indicate it waited for token refresh"); + + // Verify the first request used the original token + Assert.Single(tokenCapturingHandler.CapturedTokens); + + // Wait for background token refresh to complete + await Task.Delay(TimeSpan.FromSeconds(10)); - // The request should succeed with the refreshed token - response.EnsureSuccessStatusCode(); + // Second request - should use the refreshed token + var secondRequest = new HttpRequestMessage(HttpMethod.Get, $"https://{host}/api/2.0/sql/config/warehouses"); + var secondResponse = await httpClient.SendAsync(secondRequest, CancellationToken.None); - string content = await response.Content.ReadAsStringAsync(); - Assert.Contains("sql_configuration_parameters", content); + // The second request should also succeed (now with refreshed token) + secondResponse.EnsureSuccessStatusCode(); + string secondContent = await secondResponse.Content.ReadAsStringAsync(); + Assert.Contains("sql_configuration_parameters", secondContent); + + // Verify we now have two different tokens + Assert.Equal(2, tokenCapturingHandler.CapturedTokens.Count); + Assert.Equal(TestConfiguration.AccessToken, tokenCapturingHandler.CapturedTokens[0]); + Assert.NotEqual(TestConfiguration.AccessToken, tokenCapturingHandler.CapturedTokens[1]); } [SkippableFact] @@ -124,9 +176,12 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks.Auth string host = GetHost(); var tokenExchangeClient = new TokenExchangeClient(_httpClient, host); + // Create a token capturing handler to verify no token refresh occurs + var tokenCapturingHandler = new TokenCapturingHandler(new HttpClientHandler()); + // Create a handler that should not refresh the token (token not near expiry) var handler = new TokenExchangeDelegatingHandler( - new HttpClientHandler(), + tokenCapturingHandler, tokenExchangeClient, TestConfiguration.AccessToken, expiryTime, @@ -134,14 +189,30 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks.Auth var httpClient = new HttpClient(handler); - var request = new HttpRequestMessage(HttpMethod.Get, $"https://{host}/api/2.0/sql/config/warehouses"); - var response = await httpClient.SendAsync(request, CancellationToken.None); + // Make multiple requests to ensure no token refresh is triggered + var firstRequest = new HttpRequestMessage(HttpMethod.Get, $"https://{host}/api/2.0/sql/config/warehouses"); + var firstResponse = await httpClient.SendAsync(firstRequest, CancellationToken.None); + + // The first request should succeed with the original token + firstResponse.EnsureSuccessStatusCode(); + string firstContent = await firstResponse.Content.ReadAsStringAsync(); + Assert.Contains("sql_configuration_parameters", firstContent); + + // Similar wait as the token refresh case + await Task.Delay(TimeSpan.FromSeconds(10)); + + // Second request should also use original token (no refresh needed) + var secondRequest = new HttpRequestMessage(HttpMethod.Get, $"https://{host}/api/2.0/sql/config/warehouses"); + var secondResponse = await httpClient.SendAsync(secondRequest, CancellationToken.None); - // The request should succeed with the original token - response.EnsureSuccessStatusCode(); + secondResponse.EnsureSuccessStatusCode(); + string secondContent = await secondResponse.Content.ReadAsStringAsync(); + Assert.Contains("sql_configuration_parameters", secondContent); - string content = await response.Content.ReadAsStringAsync(); - Assert.Contains("sql_configuration_parameters", content); + // Verify both requests used the same original token (no refresh occurred) + Assert.Equal(2, tokenCapturingHandler.CapturedTokens.Count); + Assert.Equal(TestConfiguration.AccessToken, tokenCapturingHandler.CapturedTokens[0]); + Assert.Equal(TestConfiguration.AccessToken, tokenCapturingHandler.CapturedTokens[1]); } protected override void Dispose(bool disposing) diff --git a/csharp/test/Drivers/Databricks/Unit/Auth/TokenExchangeDelegatingHandlerTests.cs b/csharp/test/Drivers/Databricks/Unit/Auth/TokenExchangeDelegatingHandlerTests.cs index 1abf6dd7d..40b83c156 100644 --- a/csharp/test/Drivers/Databricks/Unit/Auth/TokenExchangeDelegatingHandlerTests.cs +++ b/csharp/test/Drivers/Databricks/Unit/Auth/TokenExchangeDelegatingHandlerTests.cs @@ -108,18 +108,22 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Tests.Auth Assert.Equal("Bearer", capturedRequest.Headers.Authorization?.Scheme); Assert.Equal(_initialToken, capturedRequest.Headers.Authorization?.Parameter); + // Wait for background task to complete + await Task.Delay(100); + _mockTokenExchangeClient.Verify( x => x.ExchangeTokenAsync(It.IsAny<string>(), It.IsAny<CancellationToken>()), Times.Never); } [Fact] - public async Task SendAsync_WithTokenNearExpiry_RenewsTokenBeforeRequest() + public async Task SendAsync_WithTokenNearExpiry_StartsTokenRenewalInBackground() { // Arrange var nearExpiryTime = DateTime.UtcNow.AddMinutes(5); // Within renewal limit var newToken = "new-renewed-token"; var newExpiry = DateTime.UtcNow.AddHours(1); + var tokenExchangeDelay = TimeSpan.FromMilliseconds(500); var handler = new TokenExchangeDelegatingHandler( _mockInnerHandler.Object, @@ -141,7 +145,11 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Tests.Auth _mockTokenExchangeClient .Setup(x => x.ExchangeTokenAsync(_initialToken, It.IsAny<CancellationToken>())) - .ReturnsAsync(tokenExchangeResponse); + .Returns(async (string token, CancellationToken ct) => + { + await Task.Delay(tokenExchangeDelay, ct); + return tokenExchangeResponse; + }); HttpRequestMessage? capturedRequest = null; @@ -154,12 +162,40 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Tests.Auth .ReturnsAsync(expectedResponse); var httpClient = new HttpClient(handler); + + // Make the first request - this should use the original token and start background refresh + var startTime = DateTime.UtcNow; var response = await httpClient.SendAsync(request); + var requestDuration = DateTime.UtcNow - startTime; Assert.Equal(expectedResponse, response); + Assert.True(requestDuration < tokenExchangeDelay, + $"Request took {requestDuration.TotalMilliseconds}ms, which is longer than the token refresh delay of {tokenExchangeDelay.TotalMilliseconds}ms"); + Assert.NotNull(capturedRequest); Assert.Equal("Bearer", capturedRequest.Headers.Authorization?.Scheme); - Assert.Equal(newToken, capturedRequest.Headers.Authorization?.Parameter); + Assert.Equal(_initialToken, capturedRequest.Headers.Authorization?.Parameter); // First request uses original token + + // Wait a bit for the background task to complete + await Task.Delay(tokenExchangeDelay + TimeSpan.FromMilliseconds(100)); + + // Make a second request - this should use the new token + var request2 = new HttpRequestMessage(HttpMethod.Get, "https://example.com/2"); + HttpRequestMessage? capturedRequest2 = null; + + _mockInnerHandler.Protected() + .Setup<Task<HttpResponseMessage>>( + "SendAsync", + ItExpr.Is<HttpRequestMessage>(r => r.RequestUri!.PathAndQuery == "/2"), + ItExpr.IsAny<CancellationToken>()) + .Callback<HttpRequestMessage, CancellationToken>((req, ct) => capturedRequest2 = req) + .ReturnsAsync(new HttpResponseMessage(HttpStatusCode.OK)); + + await httpClient.SendAsync(request2); + + Assert.NotNull(capturedRequest2); + Assert.Equal("Bearer", capturedRequest2.Headers.Authorization?.Scheme); + Assert.Equal(newToken, capturedRequest2.Headers.Authorization?.Parameter); // Second request uses new token _mockTokenExchangeClient.Verify( x => x.ExchangeTokenAsync(_initialToken, It.IsAny<CancellationToken>()), @@ -204,6 +240,9 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Tests.Auth Assert.Equal("Bearer", capturedRequest.Headers.Authorization?.Scheme); Assert.Equal(_initialToken, capturedRequest.Headers.Authorization?.Parameter); // Should still use original token + // Wait for background task to complete + await Task.Delay(100); + // Verify token exchange was attempted _mockTokenExchangeClient.Verify( x => x.ExchangeTokenAsync(_initialToken, It.IsAny<CancellationToken>()), @@ -245,8 +284,13 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Tests.Auth var httpClient = new HttpClient(handler); - // Make two requests + // Make first request to trigger token renewal await httpClient.SendAsync(new HttpRequestMessage(HttpMethod.Get, "https://example.com/1")); + + // Wait for background renewal to complete + await Task.Delay(100); + + // Make second request await httpClient.SendAsync(new HttpRequestMessage(HttpMethod.Get, "https://example.com/2")); // Token exchange should only be called once (renewed tokens cannot be renewed again) @@ -282,7 +326,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Tests.Auth .Setup(x => x.ExchangeTokenAsync(_initialToken, It.IsAny<CancellationToken>())) .Returns(async () => { - await Task.Delay(100); + await Task.Delay(200); return tokenExchangeResponse; }); @@ -305,6 +349,9 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Tests.Auth await Task.WhenAll(tasks); + // Wait for any background token renewal to complete + await Task.Delay(300); + // Token exchange should only be called once despite concurrent requests _mockTokenExchangeClient.Verify( x => x.ExchangeTokenAsync(_initialToken, It.IsAny<CancellationToken>()), @@ -342,7 +389,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Tests.Auth } [Fact] - public async Task SendAsync_WithTokenRenewalAndCancellation_PropagatesCancellation() + public async Task SendAsync_WithTokenRenewalAndCancellation_HandlesCancellationGracefully() { var nearExpiryTime = DateTime.UtcNow.AddMinutes(5); // Within renewal limit var handler = new TokenExchangeDelegatingHandler( @@ -369,6 +416,17 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Tests.Auth }); }); + _mockInnerHandler.Protected() + .Setup<Task<HttpResponseMessage>>( + "SendAsync", + ItExpr.IsAny<HttpRequestMessage>(), + ItExpr.IsAny<CancellationToken>()) + .Returns<HttpRequestMessage, CancellationToken>((req, ct) => + { + ct.ThrowIfCancellationRequested(); + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK)); + }); + cts.Cancel(); var httpClient = new HttpClient(handler); await Assert.ThrowsAnyAsync<OperationCanceledException>(() => @@ -425,6 +483,9 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Tests.Auth var httpClient = new HttpClient(handler); await httpClient.SendAsync(request); + // Wait for background renewal to complete + await Task.Delay(100); + _mockTokenExchangeClient.Verify( x => x.ExchangeTokenAsync(_initialToken, It.IsAny<CancellationToken>()), Times.Once);