This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 9a1df7631 fix(csharp/src/Drivers/Databricks): Make mandatory token
exchange blocking and fix concurrent handling (#3715)
9a1df7631 is described below
commit 9a1df763188b2a9ff05726e2930e1cc3d9e4dc5b
Author: eric-wang-1990 <[email protected]>
AuthorDate: Mon Nov 17 12:19:17 2025 -0800
fix(csharp/src/Drivers/Databricks): Make mandatory token exchange blocking
and fix concurrent handling (#3715)
## Summary
- Changed token exchange from non-blocking to blocking synchronous
operation
- Fixed concurrent token exchange handling for multiple different tokens
- Ensures requests use the exchanged Databricks token instead of the
original token that would fail authentication
- Simplified implementation with proper concurrent request handling
## Problem
The previous non-blocking implementation had two issues:
1. **First request would fail**: The handler would send the first
request with the original non-Databricks token (e.g., Azure AD, AWS IAM)
while starting token exchange in the background. Since this is
**mandatory** token exchange, the non-Databricks token would fail
authentication with Databricks.
2. **Different tokens not handled correctly**: When multiple requests
came in with different tokens, the second token would not be exchanged
properly if it arrived during the first token's exchange. The second
request would wait for the first token's exchange, then return without
starting its own exchange.
## Solution
- Token exchange now blocks the request until completion using direct
`await`
- **Serialize token exchanges**: When a request with a different token
arrives during an ongoing exchange, it waits for that exchange to
complete, then starts its own exchange
- Removed `Task.Run` background thread offloading - exchange happens on
the calling thread
- Removed `_pendingTokenTask` tracking complexity
- Removed `Dispose` method (no longer needed without background tasks)
- Removed `_lastSeenToken` check from `NeedsTokenExchange` to avoid race
conditions
- Falls back to original token if exchange fails, with improved error
message
- **Single-token cache** prevents unbounded memory growth (only most
recent token is cached)
## Implementation Details
The key insight is to **serialize all token exchanges**:
```csharp
// 1. Wait for any ongoing exchange to complete (could be for a different
token)
if (_pendingExchange != null)
{
await _pendingExchange;
}
// 2. Check if our token was already processed
if (_lastSeenToken == bearerToken)
{
return; // Already exchanged
}
// 3. Start exchange for our token
_pendingExchange = DoExchangeAsync(bearerToken, cancellationToken);
await _pendingExchange;
```
This ensures:
- Only one exchange runs at a time
- Each different token gets exchanged
- Concurrent requests with the same token share the exchange
- No race conditions
## Test plan
- [x] Test with external token to verify exchange blocks until
completion before first request
- [x] Verify subsequent requests reuse the cached exchanged token
- [x] Test with different external tokens to verify each is exchanged
properly
- [x] Test concurrent requests with same token verify only one exchange
happens
- [x] Test token exchange failure scenario to confirm fallback behavior
- [x] All 11 unit tests pass
🤖 Generated with [Claude Code](https://claude.com/claude-code)
---------
Co-authored-by: Claude <[email protected]>
---
.../MandatoryTokenExchangeDelegatingHandler.cs | 125 ++++++++++-----------
...MandatoryTokenExchangeDelegatingHandlerTests.cs | 41 ++++---
2 files changed, 88 insertions(+), 78 deletions(-)
diff --git
a/csharp/src/Drivers/Databricks/Auth/MandatoryTokenExchangeDelegatingHandler.cs
b/csharp/src/Drivers/Databricks/Auth/MandatoryTokenExchangeDelegatingHandler.cs
index edda03f53..e571d45a1 100644
---
a/csharp/src/Drivers/Databricks/Auth/MandatoryTokenExchangeDelegatingHandler.cs
+++
b/csharp/src/Drivers/Databricks/Auth/MandatoryTokenExchangeDelegatingHandler.cs
@@ -25,7 +25,8 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth
{
/// <summary>
/// HTTP message handler that performs mandatory token exchange for
non-Databricks tokens.
- /// Uses a non-blocking approach to exchange tokens in the background.
+ /// Blocks requests while exchanging tokens to ensure the exchanged token
is used.
+ /// Falls back to the original token if the exchange fails.
/// </summary>
internal class MandatoryTokenExchangeDelegatingHandler : DelegatingHandler
{
@@ -34,8 +35,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth
private readonly ITokenExchangeClient _tokenExchangeClient;
private string? _currentToken;
private string? _lastSeenToken;
-
- protected Task? _pendingTokenTask = null;
+ private Task? _pendingExchange = null;
/// <summary>
/// Initializes a new instance of the <see
cref="MandatoryTokenExchangeDelegatingHandler"/> class.
@@ -59,18 +59,6 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth
/// <returns>True if token exchange is needed, false
otherwise.</returns>
private bool NeedsTokenExchange(string bearerToken)
{
- // If we already started exchange for this token, no need to check
again
- if (_lastSeenToken == bearerToken)
- {
- return false;
- }
-
- // If we already have a pending token task, don't start another
exchange
- if (_pendingTokenTask != null)
- {
- return false;
- }
-
// If we can't parse the token as JWT, default to use existing
token
if (!JwtTokenDecoder.TryGetIssuer(bearerToken, out string issuer))
{
@@ -86,56 +74,87 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth
}
/// <summary>
- /// Starts token exchange in the background if needed.
+ /// Performs token exchange if needed.
/// </summary>
/// <param name="bearerToken">The bearer token to potentially
exchange.</param>
/// <param name="cancellationToken">A cancellation token.</param>
- private void StartTokenExchangeIfNeeded(string bearerToken,
CancellationToken cancellationToken)
+ private async Task PerformTokenExchangeIfNeeded(string bearerToken,
CancellationToken cancellationToken)
{
- if (_lastSeenToken == bearerToken)
+ // Check if we need exchange (no lock needed for this check)
+ bool needsExchange = NeedsTokenExchange(bearerToken);
+
+ if (!needsExchange)
{
+ lock (_tokenLock)
+ {
+ _lastSeenToken = bearerToken;
+ }
return;
}
- bool needsExchange;
+ // Wait for any pending exchange to complete first (could be for a
different token)
+ Task? exchangeToAwait = null;
lock (_tokenLock)
{
- needsExchange = NeedsTokenExchange(bearerToken);
-
- _lastSeenToken = bearerToken;
+ if (_pendingExchange != null)
+ {
+ exchangeToAwait = _pendingExchange;
+ }
}
- if (!needsExchange)
+ if (exchangeToAwait != null)
{
- return;
+ await exchangeToAwait;
}
- // Start token exchange in the background
- _pendingTokenTask = Task.Run(async () =>
+ // Now check if we need to exchange our token
+ lock (_tokenLock)
{
- try
+ // If this token was already processed (by us or another
concurrent request)
+ if (_lastSeenToken == bearerToken)
{
- TokenExchangeResponse response = await
_tokenExchangeClient.ExchangeTokenAsync(
- bearerToken,
- _identityFederationClientId,
- cancellationToken);
-
- lock (_tokenLock)
- {
- _currentToken = response.AccessToken;
- }
+ return;
}
- catch (Exception ex)
+
+ // Start new exchange for our token
+ _lastSeenToken = bearerToken;
+ _pendingExchange = DoExchangeAsync(bearerToken,
cancellationToken);
+ exchangeToAwait = _pendingExchange;
+ }
+
+ await exchangeToAwait;
+ }
+
+ /// <summary>
+ /// Performs the actual token exchange operation.
+ /// </summary>
+ /// <param name="bearerToken">The bearer token to exchange.</param>
+ /// <param name="cancellationToken">A cancellation token.</param>
+ private async Task DoExchangeAsync(string bearerToken,
CancellationToken cancellationToken)
+ {
+ try
+ {
+ TokenExchangeResponse response = await
_tokenExchangeClient.ExchangeTokenAsync(
+ bearerToken,
+ _identityFederationClientId,
+ cancellationToken);
+
+ lock (_tokenLock)
{
- System.Diagnostics.Debug.WriteLine($"Mandatory token
exchange failed: {ex.Message}");
+ _currentToken = response.AccessToken;
}
- }, cancellationToken).ContinueWith(_ =>
+ }
+ catch (Exception ex)
+ {
+ System.Diagnostics.Debug.WriteLine($"Mandatory token exchange
failed: {ex.Message}. Continuing with original token.");
+ }
+ finally
{
lock (_tokenLock)
{
- _pendingTokenTask = null;
+ _pendingExchange = null;
}
- }, TaskScheduler.Default);
+ }
}
/// <summary>
@@ -149,7 +168,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth
string? bearerToken = request.Headers.Authorization?.Parameter;
if (!string.IsNullOrEmpty(bearerToken))
{
- StartTokenExchangeIfNeeded(bearerToken!, cancellationToken);
+ await PerformTokenExchangeIfNeeded(bearerToken!,
cancellationToken);
string tokenToUse;
lock (_tokenLock)
@@ -163,27 +182,5 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Auth
return await base.SendAsync(request, cancellationToken);
}
- protected override void Dispose(bool disposing)
- {
- if (disposing)
- {
- // Wait for any pending token task to complete to avoid
leaking tasks
- if (_pendingTokenTask != null)
- {
- try
- {
- // Try to wait for the task to complete, but don't
block indefinitely
- _pendingTokenTask.Wait(TimeSpan.FromSeconds(10));
- }
- catch (Exception ex)
- {
- // Log any exceptions during disposal
- System.Diagnostics.Debug.WriteLine($"Exception during
token task cleanup: {ex.Message}");
- }
- }
- }
-
- base.Dispose(disposing);
- }
}
}
diff --git
a/csharp/test/Drivers/Databricks/Unit/Auth/MandatoryTokenExchangeDelegatingHandlerTests.cs
b/csharp/test/Drivers/Databricks/Unit/Auth/MandatoryTokenExchangeDelegatingHandlerTests.cs
index 13fe196e8..5da898ded 100644
---
a/csharp/test/Drivers/Databricks/Unit/Auth/MandatoryTokenExchangeDelegatingHandlerTests.cs
+++
b/csharp/test/Drivers/Databricks/Unit/Auth/MandatoryTokenExchangeDelegatingHandlerTests.cs
@@ -16,6 +16,7 @@
*/
using System;
+using System.Linq;
using System.Net;
using System.Net.Http;
using System.Text;
@@ -125,7 +126,7 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit.Auth
}
[Fact]
- public async Task
SendAsync_WithExternalToken_StartsTokenExchangeInBackground()
+ public async Task
SendAsync_WithExternalToken_BlocksUntilTokenExchangeCompletes()
{
var tokenExchangeDelay = TimeSpan.FromMilliseconds(500);
var handler = new MandatoryTokenExchangeDelegatingHandler(
@@ -165,23 +166,20 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit.Auth
var httpClient = new HttpClient(handler);
- // First request should use original token and start background
exchange
+ // First request should block until token exchange completes, then
use exchanged token
var startTime = DateTime.UtcNow;
var response = await httpClient.SendAsync(request);
var requestDuration = DateTime.UtcNow - startTime;
Assert.Equal(expectedResponse, response);
- Assert.True(requestDuration < tokenExchangeDelay,
- $"Request took {requestDuration.TotalMilliseconds}ms, which is
longer than the token exchange delay of
{tokenExchangeDelay.TotalMilliseconds}ms");
+ Assert.True(requestDuration >= tokenExchangeDelay,
+ $"Request took {requestDuration.TotalMilliseconds}ms, which is
shorter than the token exchange delay of
{tokenExchangeDelay.TotalMilliseconds}ms. Expected blocking behavior.");
Assert.NotNull(capturedRequest);
Assert.Equal("Bearer",
capturedRequest.Headers.Authorization?.Scheme);
- Assert.Equal(_externalToken,
capturedRequest.Headers.Authorization?.Parameter); // First request uses
original token
+ Assert.Equal(_exchangedToken,
capturedRequest.Headers.Authorization?.Parameter); // First request uses
exchanged token
- // Wait for background task to complete
- await Task.Delay(tokenExchangeDelay +
TimeSpan.FromMilliseconds(1000));
-
- // Make a second request - this should use the exchanged token
+ // Make a second request - this should also use the exchanged
token (cached)
var request2 = new HttpRequestMessage(HttpMethod.Get,
"https://example.com/2");
request2.Headers.Authorization = new
System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", _externalToken);
HttpRequestMessage? capturedRequest2 = null;
@@ -198,8 +196,9 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit.Auth
Assert.NotNull(capturedRequest2);
Assert.Equal("Bearer",
capturedRequest2.Headers.Authorization?.Scheme);
- Assert.Equal(_exchangedToken,
capturedRequest2.Headers.Authorization?.Parameter); // Second request uses
exchanged token
+ Assert.Equal(_exchangedToken,
capturedRequest2.Headers.Authorization?.Parameter); // Second request uses
cached exchanged token
+ // Token exchange should only be called once
_mockTokenExchangeClient.Verify(
x => x.ExchangeTokenAsync(_externalToken,
_identityFederationClientId, It.IsAny<CancellationToken>()),
Times.Once);
@@ -396,11 +395,15 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit.Auth
ExpiryTime = DateTime.UtcNow.AddHours(1)
};
- // Add a small delay to token exchange to simulate concurrent
access
+ var exchangeCallCount = 0;
+ var capturedRequests = new
System.Collections.Concurrent.ConcurrentBag<HttpRequestMessage>();
+
+ // Add a delay to token exchange to ensure concurrent requests
arrive while exchange is in progress
_mockTokenExchangeClient
.Setup(x => x.ExchangeTokenAsync(_externalToken,
_identityFederationClientId, It.IsAny<CancellationToken>()))
.Returns(async () =>
{
+ Interlocked.Increment(ref exchangeCallCount);
await Task.Delay(200);
return tokenExchangeResponse;
});
@@ -410,6 +413,7 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit.Auth
"SendAsync",
ItExpr.IsAny<HttpRequestMessage>(),
ItExpr.IsAny<CancellationToken>())
+ .Callback<HttpRequestMessage, CancellationToken>((req, ct) =>
capturedRequests.Add(req))
.ReturnsAsync(new HttpResponseMessage(HttpStatusCode.OK));
var httpClient = new HttpClient(handler);
@@ -424,13 +428,22 @@ namespace
Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit.Auth
await Task.WhenAll(tasks);
- // Wait for any background token exchange to complete
- await Task.Delay(1000);
-
// Token exchange should only be called once despite concurrent
requests
_mockTokenExchangeClient.Verify(
x => x.ExchangeTokenAsync(_externalToken,
_identityFederationClientId, It.IsAny<CancellationToken>()),
Times.Once);
+
+ Assert.Equal(1, exchangeCallCount);
+
+ // All requests should have been sent
+ Assert.Equal(3, capturedRequests.Count);
+
+ // All concurrent requests should use the exchanged token
+ // (they all wait for the same _pendingExchange task)
+ foreach (var request in capturedRequests)
+ {
+ Assert.Equal(_exchangedToken,
request.Headers.Authorization?.Parameter);
+ }
}
[Fact]