This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-java.git
The following commit(s) were added to refs/heads/main by this push:
new 44c49baf6 GH-952: Add OAuth support (#953)
44c49baf6 is described below
commit 44c49baf6c2fdfcf20c8611f45c627c9438b2adb
Author: Hélder Gregório <[email protected]>
AuthorDate: Thu Jan 22 14:19:58 2026 +0000
GH-952: Add OAuth support (#953)
## What's Changed
- Add OAuth 2.0 support to the Flight SQL JDBC driver, including client
credentials and token exchange flows
- Integrate OAuth token acquisition into connection setup, wiring tokens
through OAuthCredentialWriter and updating gRPC credential handling to
fail fast on writer errors.
- Document new OAuth connection properties and add example clients for
both OAuth flows.
- Connection Properties Added
- oauth.flow
- oauth.tokenUri
- oauth.clientId
- oauth.clientSecret
- oauth.scope
- oauth.resource
- oauth.exchange.subjectToken
- oauth.exchange.subjectTokenType
- oauth.exchange.actorToken
- oauth.exchange.actorTokenType
- oauth.exchange.aud
- oauth.exchange.requestedTokenType
- Connection config now recognizes oauth.* and oauth.exchange.*
properties and builds OAuth providers when oauth.flow is specified.
- Adds com.nimbusds:oauth2-oidc-sdk dependency and mockwebserver for
tests.
Closes #952.
---
docs/source/flight_sql_jdbc_driver.rst | 123 ++++++
.../arrow/flight/grpc/CallCredentialAdapter.java | 12 +-
flight/flight-sql-jdbc-core/pom.xml | 32 ++
.../arrow/driver/jdbc/ArrowFlightConnection.java | 1 +
.../jdbc/client/ArrowFlightSqlClientHandler.java | 23 +-
.../client/oauth/AbstractOAuthTokenProvider.java | 108 +++++
.../oauth/ClientCredentialsTokenProvider.java | 58 +++
.../jdbc/client/oauth/OAuthConfiguration.java | 240 +++++++++++
.../jdbc/client/oauth/OAuthCredentialWriter.java | 42 ++
.../jdbc/client/oauth/OAuthTokenException.java | 31 ++
.../jdbc/client/oauth/OAuthTokenProvider.java | 33 ++
.../jdbc/client/oauth/OAuthTokenProviders.java | 419 ++++++++++++++++++
.../client/oauth/TokenExchangeTokenProvider.java | 81 ++++
.../arrow/driver/jdbc/client/oauth/TokenInfo.java | 45 ++
.../utils/ArrowFlightConnectionConfigImpl.java | 52 +++
.../arrow/driver/jdbc/OAuthIntegrationTest.java | 474 +++++++++++++++++++++
.../jdbc/client/oauth/OAuthConfigurationTest.java | 296 +++++++++++++
.../client/oauth/OAuthCredentialWriterTest.java | 95 +++++
.../flight-sql-jdbc-driver/src/shade/LICENSE.txt | 8 +
.../arrow/driver/jdbc/ITDriverJarValidation.java | 4 +
20 files changed, 2173 insertions(+), 4 deletions(-)
diff --git a/docs/source/flight_sql_jdbc_driver.rst
b/docs/source/flight_sql_jdbc_driver.rst
index 180693094..4deb726b3 100644
--- a/docs/source/flight_sql_jdbc_driver.rst
+++ b/docs/source/flight_sql_jdbc_driver.rst
@@ -173,3 +173,126 @@ DriverManager#getConnection()
<https://docs.oracle.com/javase/8/docs/api/java/sql/DriverManager.html#getConnection-java.lang.String-java.lang.String-java.lang.String->`_,
the username and password supplied on the URI supercede the username and
password arguments to the function call.
+
+OAuth 2.0 Authentication
+========================
+
+The driver supports OAuth 2.0 authentication for obtaining access tokens
+from an authorization server. Two OAuth flows are currently supported:
+
+* **Client Credentials** - For service-to-service authentication where no
+ user interaction is required. The application authenticates using its own
+ credentials (client ID and client secret).
+
+* **Token Exchange** (RFC 8693) - For exchanging one token for another,
+ commonly used for federated authentication, delegation, or impersonation
+ scenarios.
+
+OAuth Connection Properties
+---------------------------
+
+The following properties configure OAuth authentication. These properties
+should be provided via the ``Properties`` object when connecting, as they
+may contain special characters that are difficult to encode in a URI.
+
+**Common OAuth Properties**
+
+.. list-table::
+ :header-rows: 1
+
+ * - Parameter
+ - Type
+ - Required
+ - Default
+ - Description
+
+ * - oauth.flow
+ - String
+ - Yes (to enable OAuth)
+ - null
+ - The OAuth grant type. Supported values: ``client_credentials``,
+ ``token_exchange``
+
+ * - oauth.tokenUri
+ - String
+ - Yes
+ - null
+ - The OAuth 2.0 token endpoint URL (e.g.,
+ ``https://auth.example.com/oauth/token``)
+
+ * - oauth.clientId
+ - String
+ - Conditional
+ - null
+ - The OAuth 2.0 client ID. Required for ``client_credentials`` flow,
+ optional for ``token_exchange``
+
+ * - oauth.clientSecret
+ - String
+ - Conditional
+ - null
+ - The OAuth 2.0 client secret. Required for ``client_credentials`` flow,
+ optional for ``token_exchange``
+
+ * - oauth.scope
+ - String
+ - No
+ - null
+ - Space-separated list of OAuth scopes to request
+
+ * - oauth.resource
+ - String
+ - No
+ - null
+ - The resource indicator for the token request (RFC 8707)
+
+**Token Exchange Properties**
+
+These properties are specific to the ``token_exchange`` flow:
+
+.. list-table::
+ :header-rows: 1
+
+ * - Parameter
+ - Type
+ - Required
+ - Default
+ - Description
+
+ * - oauth.exchange.subjectToken
+ - String
+ - Yes
+ - null
+ - The subject token to exchange (e.g., a JWT from an identity provider)
+
+ * - oauth.exchange.subjectTokenType
+ - String
+ - Yes
+ - null
+ - The token type URI of the subject token. Common values:
+ ``urn:ietf:params:oauth:token-type:access_token``,
+ ``urn:ietf:params:oauth:token-type:jwt``
+
+ * - oauth.exchange.actorToken
+ - String
+ - No
+ - null
+ - The actor token for delegation/impersonation scenarios
+
+ * - oauth.exchange.actorTokenType
+ - String
+ - No
+ - null
+ - The token type URI of the actor token
+
+ * - oauth.exchange.aud
+ - String
+ - No
+ - null
+ - The target audience for the exchanged token
+
+ * - oauth.exchange.requestedTokenType
+ - String
+ - No
+ - null
+ - The desired token type for the exchanged token
diff --git
a/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/CallCredentialAdapter.java
b/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/CallCredentialAdapter.java
index f33e9b2f9..fe81f3fb2 100644
---
a/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/CallCredentialAdapter.java
+++
b/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/CallCredentialAdapter.java
@@ -18,6 +18,7 @@ package org.apache.arrow.flight.grpc;
import io.grpc.CallCredentials;
import io.grpc.Metadata;
+import io.grpc.Status;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.apache.arrow.flight.CallHeaders;
@@ -36,9 +37,14 @@ public class CallCredentialAdapter extends CallCredentials {
RequestInfo requestInfo, Executor executor, MetadataApplier
metadataApplier) {
executor.execute(
() -> {
- final Metadata headers = new Metadata();
- credentialWriter.accept(new MetadataAdapter(headers));
- metadataApplier.apply(headers);
+ try {
+ final Metadata headers = new Metadata();
+ credentialWriter.accept(new MetadataAdapter(headers));
+ metadataApplier.apply(headers);
+ } catch (Throwable t) {
+ metadataApplier.fail(
+
Status.UNAUTHENTICATED.withCause(t).withDescription(t.getMessage()));
+ }
});
}
diff --git a/flight/flight-sql-jdbc-core/pom.xml
b/flight/flight-sql-jdbc-core/pom.xml
index bb191ca9e..da00baf32 100644
--- a/flight/flight-sql-jdbc-core/pom.xml
+++ b/flight/flight-sql-jdbc-core/pom.xml
@@ -120,6 +120,31 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver3</artifactId>
+ <version>5.3.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver3-junit5</artifactId>
+ <version>5.3.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp-jvm</artifactId>
+ <version>5.3.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okio</groupId>
+ <artifactId>okio-jvm</artifactId>
+ <version>3.16.4</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
@@ -153,6 +178,13 @@ under the License.
<artifactId>caffeine</artifactId>
<version>3.2.3</version>
</dependency>
+
+ <dependency>
+ <groupId>com.nimbusds</groupId>
+ <artifactId>oauth2-oidc-sdk</artifactId>
+ <version>11.20.1</version>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
index f81233ec3..0e9c198f5 100644
---
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
+++
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
@@ -122,6 +122,7 @@ public final class ArrowFlightConnection extends
AvaticaConnection {
.withClientCache(config.useClientCache() ? new FlightClientCache() :
null)
.withConnectTimeout(config.getConnectTimeout())
.withDriverVersion(driverVersion)
+ .withOAuthConfiguration(config.getOauthConfiguration())
.build();
} catch (final SQLException e) {
try {
diff --git
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
index 666996cd9..f0ea28423 100644
---
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
+++
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
@@ -32,6 +32,9 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import org.apache.arrow.driver.jdbc.client.oauth.OAuthConfiguration;
+import org.apache.arrow.driver.jdbc.client.oauth.OAuthCredentialWriter;
+import org.apache.arrow.driver.jdbc.client.oauth.OAuthTokenProvider;
import org.apache.arrow.driver.jdbc.client.utils.ClientAuthenticationUtils;
import org.apache.arrow.driver.jdbc.client.utils.FlightClientCache;
import org.apache.arrow.driver.jdbc.client.utils.FlightLocationQueue;
@@ -675,6 +678,8 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
@VisibleForTesting @Nullable Duration connectTimeout;
+ @VisibleForTesting @Nullable OAuthConfiguration oauthConfig;
+
// These two middleware are for internal use within build() and should not
be
// exposed by builder
// APIs.
@@ -714,6 +719,7 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
this.clientKeyPath = original.clientKeyPath;
this.allocator = original.allocator;
this.catalog = original.catalog;
+ this.oauthConfig = original.oauthConfig;
if (original.retainCookies) {
this.cookieFactory = original.cookieFactory;
@@ -983,6 +989,17 @@ public final class ArrowFlightSqlClientHandler implements
AutoCloseable {
return this;
}
+ /**
+ * Sets the OAuth configuration for this handler.
+ *
+ * @param oauthConfig the OAuth configuration
+ * @return this builder instance
+ */
+ public Builder withOAuthConfiguration(final OAuthConfiguration
oauthConfig) {
+ this.oauthConfig = oauthConfig;
+ return this;
+ }
+
public String getCacheKey() {
return getLocation().toString();
}
@@ -1070,7 +1087,11 @@ public final class ArrowFlightSqlClientHandler
implements AutoCloseable {
FlightGrpcUtils.createFlightClient(
allocator, channelBuilder.build(), clientBuilder.middleware());
final ArrayList<CallOption> credentialOptions = new ArrayList<>();
- if (isUsingUserPasswordAuth) {
+ // Authentication priority: OAuth > token > username/password
+ if (oauthConfig != null) {
+ OAuthTokenProvider tokenProvider = oauthConfig.createTokenProvider();
+ credentialOptions.add(new CredentialCallOption(new
OAuthCredentialWriter(tokenProvider)));
+ } else if (isUsingUserPasswordAuth) {
// If the authFactory has already been used for a handshake, use the
existing
// token.
// This can occur if the authFactory is being re-used for a new
connection
diff --git
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/AbstractOAuthTokenProvider.java
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/AbstractOAuthTokenProvider.java
new file mode 100644
index 000000000..9c377a585
--- /dev/null
+++
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/AbstractOAuthTokenProvider.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import com.nimbusds.oauth2.sdk.ParseException;
+import com.nimbusds.oauth2.sdk.Scope;
+import com.nimbusds.oauth2.sdk.TokenErrorResponse;
+import com.nimbusds.oauth2.sdk.TokenRequest;
+import com.nimbusds.oauth2.sdk.TokenResponse;
+import com.nimbusds.oauth2.sdk.auth.ClientAuthentication;
+import com.nimbusds.oauth2.sdk.token.AccessToken;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.SQLException;
+import java.time.Instant;
+import org.apache.arrow.util.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Abstract base class for OAuth token providers that handles token caching,
refresh logic, and
+ * common request/response handling.
+ */
+public abstract class AbstractOAuthTokenProvider implements OAuthTokenProvider
{
+ protected static final int EXPIRATION_BUFFER_SECONDS = 30;
+ protected static final int DEFAULT_EXPIRATION_SECONDS = 3600;
+
+ private final Object tokenLock = new Object();
+ private volatile @Nullable TokenInfo cachedToken;
+
+ @VisibleForTesting URI tokenUri;
+
+ @VisibleForTesting @Nullable ClientAuthentication clientAuth;
+
+ @VisibleForTesting @Nullable Scope scope;
+
+ @Override
+ public String getValidToken() throws SQLException {
+ TokenInfo token = cachedToken;
+ if (token != null && !token.isExpired(EXPIRATION_BUFFER_SECONDS)) {
+ return token.getAccessToken();
+ }
+
+ synchronized (tokenLock) {
+ token = cachedToken;
+ if (token != null && !token.isExpired(EXPIRATION_BUFFER_SECONDS)) {
+ return token.getAccessToken();
+ }
+ cachedToken = fetchNewToken();
+ return cachedToken.getAccessToken();
+ }
+ }
+
+ /**
+ * Fetches a new token from the authorization server. This method handles
the common
+ * request/response logic while delegating flow-specific request building to
subclasses.
+ *
+ * @return the new token information
+ * @throws SQLException if token cannot be obtained
+ */
+ protected TokenInfo fetchNewToken() throws SQLException {
+ try {
+ TokenRequest request = buildTokenRequest();
+ TokenResponse response =
TokenResponse.parse(request.toHTTPRequest().send());
+
+ if (!response.indicatesSuccess()) {
+ TokenErrorResponse errorResponse = response.toErrorResponse();
+ String errorMsg =
+ String.format(
+ "OAuth request failed: %s - %s",
+ errorResponse.getErrorObject().getCode(),
+ errorResponse.getErrorObject().getDescription());
+ throw new SQLException(errorMsg);
+ }
+
+ AccessToken accessToken =
response.toSuccessResponse().getTokens().getAccessToken();
+ long expiresIn =
+ accessToken.getLifetime() > 0 ? accessToken.getLifetime() :
DEFAULT_EXPIRATION_SECONDS;
+ Instant expiresAt = Instant.now().plusSeconds(expiresIn);
+
+ return new TokenInfo(accessToken.getValue(), expiresAt);
+ } catch (ParseException e) {
+ throw new SQLException("Failed to parse OAuth token response", e);
+ } catch (IOException e) {
+ throw new SQLException("Failed to send OAuth token request", e);
+ }
+ }
+
+ /**
+ * Builds the flow-specific token request.
+ *
+ * @return the token request to send to the authorization server
+ */
+ protected abstract TokenRequest buildTokenRequest();
+}
diff --git
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/ClientCredentialsTokenProvider.java
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/ClientCredentialsTokenProvider.java
new file mode 100644
index 000000000..7e6289819
--- /dev/null
+++
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/ClientCredentialsTokenProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import com.nimbusds.oauth2.sdk.ClientCredentialsGrant;
+import com.nimbusds.oauth2.sdk.Scope;
+import com.nimbusds.oauth2.sdk.TokenRequest;
+import com.nimbusds.oauth2.sdk.auth.ClientSecretBasic;
+import com.nimbusds.oauth2.sdk.auth.Secret;
+import com.nimbusds.oauth2.sdk.id.ClientID;
+import java.net.URI;
+import java.util.Objects;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * OAuth 2.0 Client Credentials flow token provider (RFC 6749 Section 4.4).
+ *
+ * <p>This provider handles service-to-service authentication where no user
interaction is required.
+ * Tokens are cached and automatically refreshed before expiration.
+ */
+public class ClientCredentialsTokenProvider extends AbstractOAuthTokenProvider
{
+
+ /**
+ * Creates a new ClientCredentialsTokenProvider.
+ *
+ * @param tokenUri the OAuth token endpoint URI
+ * @param clientId the OAuth client ID
+ * @param clientSecret the OAuth client secret
+ * @param scope optional OAuth scopes (space-separated)
+ */
+ ClientCredentialsTokenProvider(
+ URI tokenUri, String clientId, String clientSecret, @Nullable String
scope) {
+ this.tokenUri = Objects.requireNonNull(tokenUri, "tokenUri cannot be
null");
+ Objects.requireNonNull(clientId, "clientId cannot be null");
+ Objects.requireNonNull(clientSecret, "clientSecret cannot be null");
+ this.clientAuth = new ClientSecretBasic(new ClientID(clientId), new
Secret(clientSecret));
+ this.scope = (scope != null && !scope.isEmpty()) ? Scope.parse(scope) :
null;
+ }
+
+ @Override
+ protected TokenRequest buildTokenRequest() {
+ return new TokenRequest(tokenUri, clientAuth, new
ClientCredentialsGrant(), scope);
+ }
+}
diff --git
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthConfiguration.java
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthConfiguration.java
new file mode 100644
index 000000000..cba9d4c2e
--- /dev/null
+++
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthConfiguration.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import com.nimbusds.oauth2.sdk.GrantType;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.SQLException;
+import java.util.Locale;
+import java.util.Objects;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Configuration class for OAuth settings parsed from connection properties.
*/
+public class OAuthConfiguration {
+
+ private final GrantType grantType;
+ private final URI tokenUri;
+ private final @Nullable String clientId;
+ private final @Nullable String clientSecret;
+ private final @Nullable String scope;
+ private final @Nullable String subjectToken;
+ private final @Nullable String subjectTokenType;
+ private final @Nullable String actorToken;
+ private final @Nullable String actorTokenType;
+ private final @Nullable String audience;
+ private final @Nullable String resource;
+ private final @Nullable String requestedTokenType;
+
+ private OAuthConfiguration(Builder builder) throws SQLException {
+ this.grantType = builder.grantType;
+ this.tokenUri = builder.tokenUri;
+ this.clientId = builder.clientId;
+ this.clientSecret = builder.clientSecret;
+ this.scope = builder.scope;
+ this.subjectToken = builder.subjectToken;
+ this.subjectTokenType = builder.subjectTokenType;
+ this.actorToken = builder.actorToken;
+ this.actorTokenType = builder.actorTokenType;
+ this.audience = builder.audience;
+ this.resource = builder.resource;
+ this.requestedTokenType = builder.requestedTokenType;
+
+ validate();
+ }
+
+ private void validate() throws SQLException {
+ Objects.requireNonNull(grantType, "OAuth grant type is required");
+ Objects.requireNonNull(tokenUri, "Token URI is required");
+
+ if (GrantType.CLIENT_CREDENTIALS.equals(grantType)) {
+ if (clientId == null || clientId.isEmpty()) {
+ throw new SQLException("clientId is required for client_credentials
flow");
+ }
+ if (clientSecret == null || clientSecret.isEmpty()) {
+ throw new SQLException("clientSecret is required for
client_credentials flow");
+ }
+ } else if (GrantType.TOKEN_EXCHANGE.equals(grantType)) {
+ if (subjectToken == null || subjectToken.isEmpty()) {
+ throw new SQLException("subjectToken is required for token_exchange
flow");
+ }
+ if (subjectTokenType == null || subjectTokenType.isEmpty()) {
+ throw new SQLException("subjectTokenType is required for
token_exchange flow");
+ }
+ } else {
+ throw new SQLException("Unsupported OAuth grant type: " + grantType);
+ }
+ }
+
+ /**
+ * Creates an OAuthTokenProvider based on the configured grant type.
+ *
+ * @return the token provider
+ * @throws SQLException if the grant type is not supported or configuration
is invalid
+ */
+ public OAuthTokenProvider createTokenProvider() throws SQLException {
+ if (GrantType.CLIENT_CREDENTIALS.equals(grantType)) {
+ return OAuthTokenProviders.clientCredentials()
+ .tokenUri(tokenUri)
+ .clientId(clientId)
+ .clientSecret(clientSecret)
+ .scope(scope)
+ .build();
+ } else if (GrantType.TOKEN_EXCHANGE.equals(grantType)) {
+ OAuthTokenProviders.TokenExchangeBuilder builder =
+ OAuthTokenProviders.tokenExchange()
+ .tokenUri(tokenUri)
+ .subjectToken(subjectToken)
+ .subjectTokenType(subjectTokenType)
+ .actorToken(actorToken)
+ .actorTokenType(actorTokenType)
+ .audience(audience)
+ .requestedTokenType(requestedTokenType)
+ .scope(scope)
+ .resource(resource);
+
+ if (clientId != null && clientSecret != null) {
+ builder.clientCredentials(clientId, clientSecret);
+ }
+
+ return builder.build();
+ } else {
+ throw new SQLException("Unsupported OAuth grant type: " + grantType);
+ }
+ }
+
+ /** Builder for OAuthConfiguration. */
+ public static class Builder {
+ private GrantType grantType;
+ private URI tokenUri;
+ private @Nullable String clientId;
+ private @Nullable String clientSecret;
+ private @Nullable String scope;
+ private @Nullable String subjectToken;
+ private @Nullable String subjectTokenType;
+ private @Nullable String actorToken;
+ private @Nullable String actorTokenType;
+ private @Nullable String audience;
+ private @Nullable String resource;
+ private @Nullable String requestedTokenType;
+
+ /**
+ * Sets the OAuth grant type from a string value.
+ *
+ * <p>Accepts either user-friendly names ("client_credentials",
"token_exchange") or the full
+ * URN format as defined in RFC 6749 and RFC 8693.
+ *
+ * @param flowStr the flow type string (e.g., "client_credentials",
"token_exchange")
+ * @return this builder
+ * @throws SQLException if the flow string is invalid
+ */
+ public Builder flow(String flowStr) throws SQLException {
+ if (flowStr == null || flowStr.isEmpty()) {
+ throw new SQLException("OAuth flow cannot be null or empty");
+ }
+ try {
+ String normalized = flowStr.toLowerCase(Locale.ROOT);
+ // Map user-friendly names to URN format for token_exchange
+ if ("token_exchange".equals(normalized)) {
+ normalized = GrantType.TOKEN_EXCHANGE.getValue();
+ }
+ GrantType parsed = GrantType.parse(normalized);
+ if (!parsed.equals(GrantType.CLIENT_CREDENTIALS)
+ && !parsed.equals(GrantType.TOKEN_EXCHANGE)) {
+ throw new SQLException("Unsupported OAuth flow: " + flowStr);
+ }
+ this.grantType = parsed;
+ } catch (com.nimbusds.oauth2.sdk.ParseException e) {
+ throw new SQLException("Invalid OAuth flow: " + flowStr, e);
+ }
+ return this;
+ }
+
+ /**
+ * Sets the token URI.
+ *
+ * @param tokenUri the OAuth token endpoint URI
+ * @return this builder
+ * @throws SQLException if the URI is invalid
+ */
+ public Builder tokenUri(String tokenUri) throws SQLException {
+ if (tokenUri == null || tokenUri.isEmpty()) {
+ throw new SQLException("Token URI cannot be null or empty");
+ }
+ try {
+ this.tokenUri = new URI(tokenUri);
+ } catch (URISyntaxException e) {
+ throw new SQLException("Invalid token URI: " + tokenUri, e);
+ }
+ return this;
+ }
+
+ public Builder clientId(@Nullable String clientId) {
+ this.clientId = clientId;
+ return this;
+ }
+
+ public Builder clientSecret(@Nullable String clientSecret) {
+ this.clientSecret = clientSecret;
+ return this;
+ }
+
+ public Builder scope(@Nullable String scope) {
+ this.scope = scope;
+ return this;
+ }
+
+ public Builder subjectToken(@Nullable String subjectToken) {
+ this.subjectToken = subjectToken;
+ return this;
+ }
+
+ public Builder subjectTokenType(@Nullable String subjectTokenType) {
+ this.subjectTokenType = subjectTokenType;
+ return this;
+ }
+
+ public Builder actorToken(@Nullable String actorToken) {
+ this.actorToken = actorToken;
+ return this;
+ }
+
+ public Builder actorTokenType(@Nullable String actorTokenType) {
+ this.actorTokenType = actorTokenType;
+ return this;
+ }
+
+ public Builder audience(@Nullable String audience) {
+ this.audience = audience;
+ return this;
+ }
+
+ public Builder resource(@Nullable String resource) {
+ this.resource = resource;
+ return this;
+ }
+
+ public Builder requestedTokenType(@Nullable String requestedTokenType) {
+ this.requestedTokenType = requestedTokenType;
+ return this;
+ }
+
+ public OAuthConfiguration build() throws SQLException {
+ return new OAuthConfiguration(this);
+ }
+ }
+}
diff --git
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthCredentialWriter.java
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthCredentialWriter.java
new file mode 100644
index 000000000..0d4ad4689
--- /dev/null
+++
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthCredentialWriter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import java.sql.SQLException;
+import java.util.Objects;
+import java.util.function.Consumer;
+import org.apache.arrow.flight.CallHeaders;
+import org.apache.arrow.flight.auth2.Auth2Constants;
+
+/** Writes OAuth bearer tokens to Flight call headers. */
+public class OAuthCredentialWriter implements Consumer<CallHeaders> {
+ private final OAuthTokenProvider tokenProvider;
+
+ public OAuthCredentialWriter(OAuthTokenProvider tokenProvider) {
+ this.tokenProvider = Objects.requireNonNull(tokenProvider, "tokenProvider
cannot be null");
+ }
+
+ @Override
+ public void accept(CallHeaders headers) {
+ try {
+ String token = tokenProvider.getValidToken();
+ headers.insert(Auth2Constants.AUTHORIZATION_HEADER,
Auth2Constants.BEARER_PREFIX + token);
+ } catch (SQLException e) {
+ throw new OAuthTokenException("Failed to obtain OAuth token", e);
+ }
+ }
+}
diff --git
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenException.java
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenException.java
new file mode 100644
index 000000000..aceadb327
--- /dev/null
+++
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+/**
+ * Runtime exception thrown when OAuth token operations fail. Used to wrap
checked exceptions in
+ * contexts that don't allow them.
+ */
+public class OAuthTokenException extends RuntimeException {
+ public OAuthTokenException(String message) {
+ super(message);
+ }
+
+ public OAuthTokenException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenProvider.java
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenProvider.java
new file mode 100644
index 000000000..241611e43
--- /dev/null
+++
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import java.sql.SQLException;
+
+/**
+ * Interface for OAuth token providers that handle token acquisition and
refresh. Implementations
+ * should cache tokens and automatically refresh them before expiration.
+ */
+public interface OAuthTokenProvider {
+ /**
+ * Gets a valid OAuth access token, refreshing if necessary.
+ *
+ * @return a valid access token string
+ * @throws SQLException if token cannot be obtained
+ */
+ String getValidToken() throws SQLException;
+}
diff --git
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenProviders.java
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenProviders.java
new file mode 100644
index 000000000..bbf7072d3
--- /dev/null
+++
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenProviders.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import com.nimbusds.oauth2.sdk.ParseException;
+import com.nimbusds.oauth2.sdk.Scope;
+import com.nimbusds.oauth2.sdk.auth.ClientAuthentication;
+import com.nimbusds.oauth2.sdk.auth.ClientSecretBasic;
+import com.nimbusds.oauth2.sdk.auth.Secret;
+import com.nimbusds.oauth2.sdk.id.Audience;
+import com.nimbusds.oauth2.sdk.id.ClientID;
+import com.nimbusds.oauth2.sdk.token.TokenTypeURI;
+import com.nimbusds.oauth2.sdk.token.TypelessAccessToken;
+import com.nimbusds.oauth2.sdk.tokenexchange.TokenExchangeGrant;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Unified factory for creating OAuth token providers.
+ *
+ * <p>This class provides a single entry point for creating all OAuth token
providers with a
+ * consistent builder API. It supports:
+ *
+ * <ul>
+ * <li>Client Credentials flow (RFC 6749 Section 4.4)
+ * <li>Token Exchange flow (RFC 8693)
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * // Client Credentials flow
+ * OAuthTokenProvider provider = OAuthTokenProviders.clientCredentials()
+ * .tokenUri("https://auth.example.com/token")
+ * .clientId("my-client")
+ * .clientSecret("my-secret")
+ * .scope("read write")
+ * .build();
+ *
+ * // Token Exchange flow
+ * OAuthTokenProvider provider = OAuthTokenProviders.tokenExchange()
+ * .tokenUri("https://auth.example.com/token")
+ * .subjectToken("user-token")
+ * .subjectTokenType("urn:ietf:params:oauth:token-type:access_token")
+ * .build();
+ * }</pre>
+ */
+public final class OAuthTokenProviders {
+
+ private OAuthTokenProviders() {}
+
+ /**
+ * Creates a new builder for Client Credentials flow.
+ *
+ * @return a new ClientCredentialsBuilder instance
+ */
+ public static ClientCredentialsBuilder clientCredentials() {
+ return new ClientCredentialsBuilder();
+ }
+
+ /**
+ * Creates a new builder for Token Exchange flow.
+ *
+ * @return a new TokenExchangeBuilder instance
+ */
+ public static TokenExchangeBuilder tokenExchange() {
+ return new TokenExchangeBuilder();
+ }
+
+ /** Builder for creating {@link ClientCredentialsTokenProvider} instances. */
+ public static class ClientCredentialsBuilder {
+ private @Nullable URI tokenUri;
+ private @Nullable String clientId;
+ private @Nullable String clientSecret;
+ private @Nullable String scope;
+
+ ClientCredentialsBuilder() {}
+
+ /**
+ * Sets the OAuth token endpoint URI (required).
+ *
+ * @param tokenUri the token endpoint URI
+ * @return this builder
+ */
+ public ClientCredentialsBuilder tokenUri(URI tokenUri) {
+ this.tokenUri = Objects.requireNonNull(tokenUri, "tokenUri cannot be
null");
+ return this;
+ }
+
+ /**
+ * Sets the OAuth token endpoint URI from a string (required).
+ *
+ * @param tokenUri the token endpoint URI string
+ * @return this builder
+ * @throws IllegalArgumentException if the URI is invalid
+ */
+ public ClientCredentialsBuilder tokenUri(String tokenUri) {
+ Objects.requireNonNull(tokenUri, "tokenUri cannot be null");
+ try {
+ this.tokenUri = new URI(tokenUri);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Invalid token URI: " + tokenUri,
e);
+ }
+ return this;
+ }
+
+ /**
+ * Sets the OAuth client ID (required).
+ *
+ * @param clientId the client ID
+ * @return this builder
+ */
+ public ClientCredentialsBuilder clientId(String clientId) {
+ this.clientId = Objects.requireNonNull(clientId, "clientId cannot be
null");
+ return this;
+ }
+
+ /**
+ * Sets the OAuth client secret (required).
+ *
+ * @param clientSecret the client secret
+ * @return this builder
+ */
+ public ClientCredentialsBuilder clientSecret(String clientSecret) {
+ this.clientSecret = Objects.requireNonNull(clientSecret, "clientSecret
cannot be null");
+ return this;
+ }
+
+ /**
+ * Sets the OAuth scopes (optional).
+ *
+ * @param scope the space-separated scope string
+ * @return this builder
+ */
+ public ClientCredentialsBuilder scope(@Nullable String scope) {
+ this.scope = scope;
+ return this;
+ }
+
+ /**
+ * Builds a new ClientCredentialsTokenProvider instance.
+ *
+ * @return the configured ClientCredentialsTokenProvider
+ * @throws IllegalStateException if required parameters are missing
+ */
+ public ClientCredentialsTokenProvider build() {
+ if (tokenUri == null) {
+ throw new IllegalStateException("tokenUri is required");
+ }
+ if (clientId == null) {
+ throw new IllegalStateException("clientId is required");
+ }
+ if (clientSecret == null) {
+ throw new IllegalStateException("clientSecret is required");
+ }
+ return new ClientCredentialsTokenProvider(tokenUri, clientId,
clientSecret, scope);
+ }
+ }
+
+ /** Builder for creating {@link TokenExchangeTokenProvider} instances. */
+ public static class TokenExchangeBuilder {
+ private @Nullable URI tokenUri;
+ private @Nullable String subjectToken;
+ private @Nullable String subjectTokenType;
+ private @Nullable String actorToken;
+ private @Nullable String actorTokenType;
+ private @Nullable String audience;
+ private @Nullable String requestedTokenType;
+ private @Nullable Scope scope;
+ private @Nullable List<URI> resources;
+ private @Nullable ClientAuthentication clientAuth;
+
+ TokenExchangeBuilder() {}
+
+ /**
+ * Sets the OAuth token endpoint URI (required).
+ *
+ * @param tokenUri the token endpoint URI
+ * @return this builder
+ */
+ public TokenExchangeBuilder tokenUri(URI tokenUri) {
+ this.tokenUri = Objects.requireNonNull(tokenUri, "tokenUri cannot be
null");
+ return this;
+ }
+
+ /**
+ * Sets the OAuth token endpoint URI from a string (required).
+ *
+ * @param tokenUri the token endpoint URI string
+ * @return this builder
+ * @throws IllegalArgumentException if the URI is invalid
+ */
+ public TokenExchangeBuilder tokenUri(String tokenUri) {
+ Objects.requireNonNull(tokenUri, "tokenUri cannot be null");
+ try {
+ this.tokenUri = new URI(tokenUri);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Invalid token URI: " + tokenUri,
e);
+ }
+ return this;
+ }
+
+ /**
+ * Sets the subject token to exchange (required).
+ *
+ * @param subjectToken the subject token value
+ * @return this builder
+ */
+ public TokenExchangeBuilder subjectToken(String subjectToken) {
+ this.subjectToken = Objects.requireNonNull(subjectToken, "subjectToken
cannot be null");
+ return this;
+ }
+
+ /**
+ * Sets the type of the subject token (required).
+ *
+ * @param subjectTokenType the subject token type URI
+ * @return this builder
+ */
+ public TokenExchangeBuilder subjectTokenType(String subjectTokenType) {
+ this.subjectTokenType =
+ Objects.requireNonNull(subjectTokenType, "subjectTokenType cannot be
null");
+ return this;
+ }
+
+ /**
+ * Sets the optional actor token for delegation scenarios.
+ *
+ * @param actorToken the actor token value
+ * @return this builder
+ */
+ public TokenExchangeBuilder actorToken(@Nullable String actorToken) {
+ this.actorToken = actorToken;
+ return this;
+ }
+
+ /**
+ * Sets the type of the actor token.
+ *
+ * @param actorTokenType the actor token type URI
+ * @return this builder
+ */
+ public TokenExchangeBuilder actorTokenType(@Nullable String
actorTokenType) {
+ this.actorTokenType = actorTokenType;
+ return this;
+ }
+
+ /**
+ * Sets the target audience for the exchanged token.
+ *
+ * @param audience the target audience
+ * @return this builder
+ */
+ public TokenExchangeBuilder audience(@Nullable String audience) {
+ this.audience = audience;
+ return this;
+ }
+
+ /**
+ * Sets the requested token type for the exchanged token.
+ *
+ * @param requestedTokenType the requested token type URI
+ * @return this builder
+ */
+ public TokenExchangeBuilder requestedTokenType(@Nullable String
requestedTokenType) {
+ this.requestedTokenType = requestedTokenType;
+ return this;
+ }
+
+ /**
+ * Sets the OAuth scopes for the token request.
+ *
+ * @param scope the OAuth scope object
+ * @return this builder
+ */
+ public TokenExchangeBuilder scope(@Nullable Scope scope) {
+ this.scope = scope;
+ return this;
+ }
+
+ /**
+ * Sets the OAuth scopes from a space-separated string.
+ *
+ * @param scope the space-separated scope string
+ * @return this builder
+ */
+ public TokenExchangeBuilder scope(@Nullable String scope) {
+ this.scope = (scope != null && !scope.isEmpty()) ? Scope.parse(scope) :
null;
+ return this;
+ }
+
+ /**
+ * Sets the target resource URIs (RFC 8707).
+ *
+ * @param resources the list of resource URIs
+ * @return this builder
+ */
+ public TokenExchangeBuilder resources(@Nullable List<URI> resources) {
+ this.resources = resources;
+ return this;
+ }
+
+ /**
+ * Sets a single target resource URI (RFC 8707).
+ *
+ * @param resource the resource URI
+ * @return this builder
+ */
+ public TokenExchangeBuilder resource(@Nullable URI resource) {
+ this.resources = resource != null ? Collections.singletonList(resource)
: null;
+ return this;
+ }
+
+ /**
+ * Sets a single target resource URI from a string (RFC 8707).
+ *
+ * @param resource the resource URI string
+ * @return this builder
+ */
+ public TokenExchangeBuilder resource(@Nullable String resource) {
+ if (resource != null && !resource.isEmpty()) {
+ this.resources = Collections.singletonList(URI.create(resource));
+ } else {
+ this.resources = null;
+ }
+ return this;
+ }
+
+ /**
+ * Sets the client authentication.
+ *
+ * @param clientAuth the client authentication object
+ * @return this builder
+ */
+ public TokenExchangeBuilder clientAuthentication(@Nullable
ClientAuthentication clientAuth) {
+ this.clientAuth = clientAuth;
+ return this;
+ }
+
+ /**
+ * Sets client authentication using client ID and secret.
+ *
+ * @param clientId the client ID
+ * @param clientSecret the client secret
+ * @return this builder
+ */
+ public TokenExchangeBuilder clientCredentials(String clientId, String
clientSecret) {
+ Objects.requireNonNull(clientId, "clientId cannot be null");
+ Objects.requireNonNull(clientSecret, "clientSecret cannot be null");
+ this.clientAuth = new ClientSecretBasic(new ClientID(clientId), new
Secret(clientSecret));
+ return this;
+ }
+
+ /**
+ * Builds a new TokenExchangeTokenProvider instance.
+ *
+ * @return the configured TokenExchangeTokenProvider
+ * @throws IllegalStateException if required parameters are missing
+ */
+ public TokenExchangeTokenProvider build() {
+ if (tokenUri == null) {
+ throw new IllegalStateException("tokenUri is required");
+ }
+ if (subjectToken == null) {
+ throw new IllegalStateException("subjectToken is required");
+ }
+ if (subjectTokenType == null) {
+ throw new IllegalStateException("subjectTokenType is required");
+ }
+
+ TokenExchangeGrant grant = createGrant();
+ return new TokenExchangeTokenProvider(tokenUri, grant, clientAuth,
scope, resources);
+ }
+
+ private TokenExchangeGrant createGrant() {
+ try {
+ TypelessAccessToken subjectAccessToken = new
TypelessAccessToken(subjectToken);
+ TokenTypeURI subjectTypeUri = TokenTypeURI.parse(subjectTokenType);
+
+ TypelessAccessToken actorAccessToken =
+ actorToken != null ? new TypelessAccessToken(actorToken) : null;
+ TokenTypeURI actorTypeUri =
+ actorTokenType != null ? TokenTypeURI.parse(actorTokenType) : null;
+ TokenTypeURI requestedTypeUri =
+ requestedTokenType != null ?
TokenTypeURI.parse(requestedTokenType) : null;
+ List<Audience> audienceList =
+ audience != null ? Collections.singletonList(new
Audience(audience)) : null;
+
+ return new TokenExchangeGrant(
+ subjectAccessToken,
+ subjectTypeUri,
+ actorAccessToken,
+ actorTypeUri,
+ requestedTypeUri,
+ audienceList);
+ } catch (ParseException e) {
+ throw new IllegalStateException("Failed to create TokenExchangeGrant",
e);
+ }
+ }
+ }
+}
diff --git
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/TokenExchangeTokenProvider.java
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/TokenExchangeTokenProvider.java
new file mode 100644
index 000000000..af433a271
--- /dev/null
+++
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/TokenExchangeTokenProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import com.nimbusds.oauth2.sdk.Scope;
+import com.nimbusds.oauth2.sdk.TokenRequest;
+import com.nimbusds.oauth2.sdk.auth.ClientAuthentication;
+import com.nimbusds.oauth2.sdk.tokenexchange.TokenExchangeGrant;
+import java.net.URI;
+import java.util.List;
+import java.util.Objects;
+import org.apache.arrow.util.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * OAuth 2.0 Token Exchange flow token provider (RFC 8693).
+ *
+ * <p>This provider exchanges one token for another, commonly used for
federated authentication,
+ * delegation, or impersonation scenarios. Tokens are cached and automatically
refreshed.
+ */
+public class TokenExchangeTokenProvider extends AbstractOAuthTokenProvider {
+
+ @VisibleForTesting TokenExchangeGrant grant;
+
+ @VisibleForTesting @Nullable List<URI> resources;
+
+ /**
+ * Creates a new TokenExchangeTokenProvider with full configuration.
+ *
+ * @param tokenUri the OAuth token endpoint URI
+ * @param grant the token exchange grant containing subject/actor token
information
+ * @param clientAuth optional client authentication
+ * @param scope optional OAuth scopes
+ * @param resource optional target resource URI (RFC 8707)
+ */
+ TokenExchangeTokenProvider(
+ URI tokenUri,
+ TokenExchangeGrant grant,
+ @Nullable ClientAuthentication clientAuth,
+ @Nullable Scope scope,
+ @Nullable List<URI> resource) {
+ this.tokenUri = Objects.requireNonNull(tokenUri, "tokenUri cannot be
null");
+ this.grant = Objects.requireNonNull(grant, "grant cannot be null");
+ this.scope = scope;
+ this.resources = resource;
+ this.clientAuth = clientAuth;
+ }
+
+ @Override
+ protected TokenRequest buildTokenRequest() {
+ TokenRequest.Builder builder;
+ if (clientAuth != null) {
+ builder = new TokenRequest.Builder(tokenUri, clientAuth, grant);
+ } else {
+ builder = new TokenRequest.Builder(tokenUri, grant);
+ }
+
+ if (scope != null) {
+ builder.scope(scope);
+ }
+ if (resources != null) {
+ builder.resources(resources.toArray(new URI[0]));
+ }
+
+ return builder.build();
+ }
+}
diff --git
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/TokenInfo.java
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/TokenInfo.java
new file mode 100644
index 000000000..f47cc8b05
--- /dev/null
+++
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/TokenInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import java.time.Instant;
+import java.util.Objects;
+
+/** Holds OAuth token information including the access token and expiration
time. */
+public class TokenInfo {
+ private final String accessToken;
+ private final Instant expiresAt;
+
+ public TokenInfo(String accessToken, Instant expiresAt) {
+ this.accessToken = Objects.requireNonNull(accessToken, "accessToken cannot
be null");
+ this.expiresAt = Objects.requireNonNull(expiresAt, "expiresAt cannot be
null");
+ }
+
+ public String getAccessToken() {
+ return accessToken;
+ }
+
+ /**
+ * Checks if the token is expired or will expire within the buffer period.
+ *
+ * @param bufferSeconds seconds before actual expiration to consider token
expired
+ * @return true if token should be refreshed
+ */
+ public boolean isExpired(int bufferSeconds) {
+ return Instant.now().plusSeconds(bufferSeconds).isAfter(expiresAt);
+ }
+}
diff --git
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
index 76ba964a5..d0ba74dbc 100644
---
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
+++
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.arrow.driver.jdbc.utils;
+import java.sql.SQLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
@@ -23,6 +24,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.arrow.driver.jdbc.ArrowFlightConnection;
+import org.apache.arrow.driver.jdbc.client.oauth.OAuthConfiguration;
import org.apache.arrow.flight.CallHeaders;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.FlightCallHeaders;
@@ -31,6 +33,7 @@ import org.apache.arrow.util.Preconditions;
import org.apache.calcite.avatica.ConnectionConfig;
import org.apache.calcite.avatica.ConnectionConfigImpl;
import org.apache.calcite.avatica.ConnectionProperty;
+import org.checkerframework.checker.nullness.qual.Nullable;
/** A {@link ConnectionConfig} for the {@link ArrowFlightConnection}. */
public final class ArrowFlightConnectionConfigImpl extends
ConnectionConfigImpl {
@@ -211,6 +214,38 @@ public final class ArrowFlightConnectionConfigImpl extends
ConnectionConfigImpl
return headers;
}
+ /**
+ * Returns OAuth configuration if oauth.flow is specified, null otherwise.
+ *
+ * @return the OAuth configuration or null
+ * @throws SQLException if the OAuth configuration is invalid
+ */
+ public @Nullable OAuthConfiguration getOauthConfiguration() throws
SQLException {
+ String flow =
ArrowFlightConnectionProperty.OAUTH_FLOW.getString(properties);
+ if (flow == null) {
+ return null;
+ }
+
+ return new OAuthConfiguration.Builder()
+ .flow(flow)
+
.clientId(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.getString(properties))
+
.clientSecret(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.getString(properties))
+
.tokenUri(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.getString(properties))
+ .scope(ArrowFlightConnectionProperty.OAUTH_SCOPE.getString(properties))
+
.resource(ArrowFlightConnectionProperty.OAUTH_RESOURCE.getString(properties))
+ .subjectToken(
+
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN.getString(properties))
+ .subjectTokenType(
+
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN_TYPE.getString(properties))
+
.actorToken(ArrowFlightConnectionProperty.OAUTH_EXCHANGE_ACTOR_TOKEN.getString(properties))
+ .actorTokenType(
+
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_ACTOR_TOKEN_TYPE.getString(properties))
+
.audience(ArrowFlightConnectionProperty.OAUTH_EXCHANGE_AUDIENCE.getString(properties))
+ .requestedTokenType(
+
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_REQUESTED_TOKEN_TYPE.getString(properties))
+ .build();
+ }
+
/** Custom {@link ConnectionProperty} for the {@link
ArrowFlightConnectionConfigImpl}. */
public enum ArrowFlightConnectionProperty implements ConnectionProperty {
HOST("host", null, Type.STRING, true),
@@ -232,6 +267,23 @@ public final class ArrowFlightConnectionConfigImpl extends
ConnectionConfigImpl
CATALOG("catalog", null, Type.STRING, false),
CONNECT_TIMEOUT_MILLIS("connectTimeoutMs", 10000, Type.NUMBER, false),
USE_CLIENT_CACHE("useClientCache", true, Type.BOOLEAN, false),
+
+ // OAuth configuration properties
+ OAUTH_FLOW("oauth.flow", null, Type.STRING, false),
+ OAUTH_CLIENT_ID("oauth.clientId", null, Type.STRING, false),
+ OAUTH_CLIENT_SECRET("oauth.clientSecret", null, Type.STRING, false),
+ OAUTH_TOKEN_URI("oauth.tokenUri", null, Type.STRING, false),
+ OAUTH_SCOPE("oauth.scope", null, Type.STRING, false),
+ OAUTH_RESOURCE("oauth.resource", null, Type.STRING, false),
+
+ // Token exchange specific properties
+ OAUTH_EXCHANGE_SUBJECT_TOKEN("oauth.exchange.subjectToken", null,
Type.STRING, false),
+ OAUTH_EXCHANGE_SUBJECT_TOKEN_TYPE("oauth.exchange.subjectTokenType", null,
Type.STRING, false),
+ OAUTH_EXCHANGE_ACTOR_TOKEN("oauth.exchange.actorToken", null, Type.STRING,
false),
+ OAUTH_EXCHANGE_ACTOR_TOKEN_TYPE("oauth.exchange.actorTokenType", null,
Type.STRING, false),
+ OAUTH_EXCHANGE_AUDIENCE("oauth.exchange.aud", null, Type.STRING, false),
+ OAUTH_EXCHANGE_REQUESTED_TOKEN_TYPE(
+ "oauth.exchange.requestedTokenType", null, Type.STRING, false),
;
private final String camelName;
diff --git
a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/OAuthIntegrationTest.java
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/OAuthIntegrationTest.java
new file mode 100644
index 000000000..5e782db03
--- /dev/null
+++
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/OAuthIntegrationTest.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import mockwebserver3.MockResponse;
+import mockwebserver3.MockWebServer;
+import mockwebserver3.RecordedRequest;
+import mockwebserver3.junit5.StartStop;
+import org.apache.arrow.driver.jdbc.authentication.TokenAuthentication;
+import
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty;
+import org.apache.arrow.driver.jdbc.utils.MockFlightSqlProducer;
+import org.apache.arrow.flight.sql.FlightSqlProducer.Schemas;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetCatalogs;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetDbSchemas;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+/**
+ * Integration tests for OAuth authentication flows in the JDBC driver.
+ *
+ * <p>These tests verify that OAuth tokens obtained from an OAuth server are
correctly used in
+ * Flight SQL requests.
+ */
+public class OAuthIntegrationTest {
+
+ private static final String VALID_ACCESS_TOKEN =
"valid-oauth-access-token-12345";
+ private static final String CLIENT_ID = "test-client-id";
+ private static final String CLIENT_SECRET = "test-client-secret";
+ private static final String SUBJECT_TOKEN = "original-subject-token";
+ private static final String SUBJECT_TOKEN_TYPE =
"urn:ietf:params:oauth:token-type:jwt";
+ private static final String TEST_SCOPE = "dremio.all";
+
+ private static final MockFlightSqlProducer FLIGHT_SQL_PRODUCER = new
MockFlightSqlProducer();
+
+ @RegisterExtension public static FlightServerTestExtension
FLIGHT_SERVER_TEST_EXTENSION;
+
+ static {
+ FLIGHT_SERVER_TEST_EXTENSION =
+ new FlightServerTestExtension.Builder()
+ .authentication(new
TokenAuthentication.Builder().token(VALID_ACCESS_TOKEN).build())
+ .producer(FLIGHT_SQL_PRODUCER)
+ .build();
+ }
+
+ @StartStop private final MockWebServer oauthServer = new MockWebServer();
+ private URI tokenEndpoint;
+
+ @BeforeAll
+ public static void setUpClass() {
+ // Register a simple catalog query handler
+ FLIGHT_SQL_PRODUCER.addCatalogQuery(
+ CommandGetCatalogs.getDefaultInstance(),
+ listener -> {
+ try (BufferAllocator allocator = new RootAllocator();
+ VectorSchemaRoot root =
+ VectorSchemaRoot.create(Schemas.GET_CATALOGS_SCHEMA,
allocator)) {
+ root.setRowCount(0);
+ listener.start(root);
+ listener.putNext();
+ } catch (Throwable t) {
+ listener.error(t);
+ } finally {
+ listener.completed();
+ }
+ });
+
+ // Register a simple schema query handler for getSchemas()
+ FLIGHT_SQL_PRODUCER.addCatalogQuery(
+ CommandGetDbSchemas.getDefaultInstance(),
+ listener -> {
+ try (BufferAllocator allocator = new RootAllocator();
+ VectorSchemaRoot root =
+ VectorSchemaRoot.create(Schemas.GET_SCHEMAS_SCHEMA,
allocator)) {
+ root.setRowCount(0);
+ listener.start(root);
+ listener.putNext();
+ } catch (Throwable t) {
+ listener.error(t);
+ } finally {
+ listener.completed();
+ }
+ });
+ }
+
+ @AfterAll
+ public static void tearDownClass() {
+ AutoCloseables.closeNoChecked(FLIGHT_SQL_PRODUCER);
+ }
+
+ @BeforeEach
+ public void setUp() {
+ tokenEndpoint = oauthServer.url("/oauth/token").uri();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ oauthServer.close();
+ }
+
+ // Helper methods for mock OAuth responses
+
+ private void enqueueSuccessfulTokenResponse() {
+ enqueueSuccessfulTokenResponse(VALID_ACCESS_TOKEN, 3600);
+ }
+
+ private void enqueueSuccessfulTokenResponse(String token, int expiresIn) {
+ String body =
+ String.format(
+
"{\"access_token\":\"%s\",\"token_type\":\"Bearer\",\"expires_in\":%d}",
+ token, expiresIn);
+ oauthServer.enqueue(
+ new MockResponse.Builder()
+ .code(200)
+ .setHeader("Content-Type", "application/json")
+ .body(body)
+ .build());
+ }
+
+ private void enqueueErrorResponse(String error, String description) {
+ String body =
+ String.format("{\"error\":\"%s\",\"error_description\":\"%s\"}",
error, description);
+ oauthServer.enqueue(
+ new MockResponse.Builder()
+ .code(400)
+ .setHeader("Content-Type", "application/json")
+ .body(body)
+ .build());
+ }
+
+ private Properties createBaseProperties() {
+ Properties props = new Properties();
+ props.put(ArrowFlightConnectionProperty.HOST.camelName(), "localhost");
+ props.put(
+ ArrowFlightConnectionProperty.PORT.camelName(),
FLIGHT_SERVER_TEST_EXTENSION.getPort());
+ props.put(ArrowFlightConnectionProperty.USE_ENCRYPTION.camelName(), false);
+ return props;
+ }
+
+ private String getJdbcUrl() {
+ return String.format(
+ "jdbc:arrow-flight-sql://localhost:%d",
FLIGHT_SERVER_TEST_EXTENSION.getPort());
+ }
+
+ // ==================== Client Credentials Flow Tests ====================
+
+ @Test
+ public void testClientCredentialsFlowSuccess() throws Exception {
+ enqueueSuccessfulTokenResponse();
+
+ Properties props = createBaseProperties();
+ props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(),
"client_credentials");
+ props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(),
tokenEndpoint.toString());
+ props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.camelName(),
CLIENT_ID);
+ props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.camelName(),
CLIENT_SECRET);
+ props.put(ArrowFlightConnectionProperty.OAUTH_SCOPE.camelName(),
TEST_SCOPE);
+
+ try (Connection conn = DriverManager.getConnection(getJdbcUrl(), props)) {
+ assertFalse(conn.isClosed());
+ // Trigger a Flight call to force OAuth token retrieval
+ conn.getMetaData().getCatalogs().close();
+ }
+
+ // Verify OAuth request was made
+ RecordedRequest oauthRequest = oauthServer.takeRequest(5,
TimeUnit.SECONDS);
+ assertNotNull(oauthRequest, "OAuth request should have been made");
+ assertEquals("POST", oauthRequest.getMethod());
+ String body = oauthRequest.getBody().utf8();
+ assertTrue(body.contains("grant_type=client_credentials"));
+ assertTrue(body.contains("scope=" + TEST_SCOPE));
+ }
+
+ @Test
+ public void testClientCredentialsFlowWithUrlParameters() throws Exception {
+ enqueueSuccessfulTokenResponse();
+
+ String url =
+ String.format(
+ "jdbc:arrow-flight-sql://localhost:%d?useEncryption=false"
+ + "&oauth.flow=client_credentials"
+ + "&oauth.tokenUri=%s"
+ + "&oauth.clientId=%s"
+ + "&oauth.clientSecret=%s",
+ FLIGHT_SERVER_TEST_EXTENSION.getPort(),
+ tokenEndpoint.toString(),
+ CLIENT_ID,
+ CLIENT_SECRET);
+
+ try (Connection conn = DriverManager.getConnection(url)) {
+ conn.getMetaData().getCatalogs().close();
+ }
+
+ assertEquals(1, oauthServer.getRequestCount());
+ }
+
+ @Test
+ public void testClientCredentialsFlowInvalidCredentials() throws Exception {
+ enqueueErrorResponse("invalid_client", "Client authentication failed");
+
+ Properties props = createBaseProperties();
+ props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(),
"client_credentials");
+ props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(),
tokenEndpoint.toString());
+ props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.camelName(),
"wrong-client");
+ props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.camelName(),
"wrong-secret");
+
+ Exception ex =
+ assertThrows(
+ Exception.class,
+ () -> {
+ try (Connection conn = DriverManager.getConnection(getJdbcUrl(),
props)) {
+ conn.getMetaData().getCatalogs().close();
+ }
+ });
+ // Verify the error message contains the OAuth error somewhere in the
exception chain
+ assertTrue(
+ containsInExceptionChain(ex, "invalid_client"),
+ "Exception chain should contain 'invalid_client'");
+ }
+
+ private boolean containsInExceptionChain(Throwable t, String message) {
+ while (t != null) {
+ if (t.getMessage() != null && t.getMessage().contains(message)) {
+ return true;
+ }
+ t = t.getCause();
+ }
+ return false;
+ }
+
+ // ==================== Token Exchange Flow Tests ====================
+
+ @Test
+ public void testTokenExchangeFlowMinimalParameters() throws Exception {
+ enqueueSuccessfulTokenResponse();
+
+ Properties props = createBaseProperties();
+ props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(),
"token_exchange");
+ props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(),
tokenEndpoint.toString());
+ props.put(
+
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN.camelName(),
SUBJECT_TOKEN);
+ props.put(
+
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN_TYPE.camelName(),
+ SUBJECT_TOKEN_TYPE);
+
+ try (Connection conn = DriverManager.getConnection(getJdbcUrl(), props)) {
+ conn.getMetaData().getCatalogs().close();
+ }
+
+ RecordedRequest oauthRequest = oauthServer.takeRequest(5,
TimeUnit.SECONDS);
+ assertNotNull(oauthRequest, "OAuth request should have been made");
+ String body = oauthRequest.getBody().utf8();
+ assertTrue(
+
body.contains("grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Atoken-exchange"),
+ "Should contain token exchange grant type");
+ assertTrue(body.contains("subject_token=" + SUBJECT_TOKEN));
+ }
+
+ @Test
+ public void testTokenExchangeFlowWithAllParameters() throws Exception {
+ enqueueSuccessfulTokenResponse();
+
+ String actorToken = "actor-token-value";
+ String actorTokenType = "urn:ietf:params:oauth:token-type:access_token";
+ String audience = "https://api.example.com";
+ String resource = "https://api.example.com/resource";
+ String requestedTokenType =
"urn:ietf:params:oauth:token-type:access_token";
+
+ Properties props = createBaseProperties();
+ props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(),
"token_exchange");
+ props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(),
tokenEndpoint.toString());
+ props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.camelName(),
CLIENT_ID);
+ props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.camelName(),
CLIENT_SECRET);
+ props.put(ArrowFlightConnectionProperty.OAUTH_SCOPE.camelName(),
TEST_SCOPE);
+ props.put(
+
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN.camelName(),
SUBJECT_TOKEN);
+ props.put(
+
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN_TYPE.camelName(),
+ SUBJECT_TOKEN_TYPE);
+
props.put(ArrowFlightConnectionProperty.OAUTH_EXCHANGE_ACTOR_TOKEN.camelName(),
actorToken);
+ props.put(
+
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_ACTOR_TOKEN_TYPE.camelName(),
actorTokenType);
+
props.put(ArrowFlightConnectionProperty.OAUTH_EXCHANGE_AUDIENCE.camelName(),
audience);
+ props.put(ArrowFlightConnectionProperty.OAUTH_RESOURCE.camelName(),
resource);
+ props.put(
+
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_REQUESTED_TOKEN_TYPE.camelName(),
+ requestedTokenType);
+
+ try (Connection conn = DriverManager.getConnection(getJdbcUrl(), props)) {
+ conn.getMetaData().getCatalogs().close();
+ }
+
+ RecordedRequest oauthRequest = oauthServer.takeRequest(5,
TimeUnit.SECONDS);
+ assertNotNull(oauthRequest, "OAuth request should have been made");
+ String body = oauthRequest.getBody().utf8();
+ assertTrue(body.contains("subject_token=" + SUBJECT_TOKEN));
+ assertTrue(body.contains("actor_token=" + actorToken));
+ }
+
+ @Test
+ public void testTokenExchangeFlowWithClientAuthentication() throws Exception
{
+ enqueueSuccessfulTokenResponse();
+
+ Properties props = createBaseProperties();
+ props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(),
"token_exchange");
+ props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(),
tokenEndpoint.toString());
+ props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.camelName(),
CLIENT_ID);
+ props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.camelName(),
CLIENT_SECRET);
+ props.put(
+
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN.camelName(),
SUBJECT_TOKEN);
+ props.put(
+
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN_TYPE.camelName(),
+ SUBJECT_TOKEN_TYPE);
+
+ try (Connection conn = DriverManager.getConnection(getJdbcUrl(), props)) {
+ conn.getMetaData().getCatalogs().close();
+ }
+
+ RecordedRequest oauthRequest = oauthServer.takeRequest(5,
TimeUnit.SECONDS);
+ assertNotNull(oauthRequest, "OAuth request should have been made");
+ String authHeader = oauthRequest.getHeaders().get("Authorization");
+ assertNotNull(authHeader, "Should have Basic auth header for client
authentication");
+ assertTrue(authHeader.startsWith("Basic "));
+ }
+
+ // ==================== Token Caching Tests ====================
+
+ @Test
+ public void testTokenCachingAcrossMultipleOperations() throws Exception {
+ enqueueSuccessfulTokenResponse();
+
+ Properties props = createBaseProperties();
+ props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(),
"client_credentials");
+ props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(),
tokenEndpoint.toString());
+ props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.camelName(),
CLIENT_ID);
+ props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.camelName(),
CLIENT_SECRET);
+
+ try (Connection conn = DriverManager.getConnection(getJdbcUrl(), props)) {
+ // Execute multiple operations
+ conn.isValid(5);
+ conn.getMetaData().getCatalogs().close();
+ conn.getMetaData().getSchemas().close();
+ }
+
+ // Should only have made one OAuth request due to caching
+ assertEquals(1, oauthServer.getRequestCount());
+ }
+
+ @Test
+ public void testTokenRefreshAfterExpiration() throws Exception {
+ enqueueSuccessfulTokenResponse(VALID_ACCESS_TOKEN, 1);
+ enqueueSuccessfulTokenResponse(VALID_ACCESS_TOKEN, 3600);
+
+ Properties props = createBaseProperties();
+ props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(),
"token_exchange");
+ props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(),
tokenEndpoint.toString());
+ props.put(
+
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN.camelName(),
SUBJECT_TOKEN);
+ props.put(
+
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN_TYPE.camelName(),
+ SUBJECT_TOKEN_TYPE);
+
+ try (Connection conn = DriverManager.getConnection(getJdbcUrl(), props)) {
+ // First operation triggers initial token fetch
+ conn.getMetaData().getCatalogs().close();
+
+ // Token with 1s expiry is immediately considered expired (due to 30s
buffer)
+ // so the next operation should trigger a refresh
+ conn.getMetaData().getCatalogs().close();
+ }
+
+ // Should have made exactly 2 OAuth requests: initial + refresh
+ assertEquals(2, oauthServer.getRequestCount());
+ }
+
+ // ==================== Error Handling Tests ====================
+
+ @Test
+ public void testMissingRequiredParametersClientCredentials() {
+ Properties props = createBaseProperties();
+ props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(),
"client_credentials");
+ props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(),
tokenEndpoint.toString());
+ // Missing client_id and client_secret
+
+ assertThrows(SQLException.class, () ->
DriverManager.getConnection(getJdbcUrl(), props));
+ }
+
+ @Test
+ public void testMissingRequiredParametersTokenExchange() {
+ Properties props = createBaseProperties();
+ props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(),
"token_exchange");
+ props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(),
tokenEndpoint.toString());
+ // Missing subject_token and subject_token_type
+
+ assertThrows(SQLException.class, () ->
DriverManager.getConnection(getJdbcUrl(), props));
+ }
+
+ @Test
+ public void testInvalidOAuthFlow() {
+ Properties props = createBaseProperties();
+ props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(),
"invalid_flow");
+ props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(),
tokenEndpoint.toString());
+
+ assertThrows(SQLException.class, () ->
DriverManager.getConnection(getJdbcUrl(), props));
+ }
+
+ @Test
+ public void testMalformedTokenEndpoint() {
+ Properties props = createBaseProperties();
+ props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(),
"client_credentials");
+ props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(),
"not-a-valid-uri://");
+ props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.camelName(),
CLIENT_ID);
+ props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.camelName(),
CLIENT_SECRET);
+
+ assertThrows(SQLException.class, () ->
DriverManager.getConnection(getJdbcUrl(), props));
+ }
+
+ // ==================== Authorization Header Verification
====================
+
+ @Test
+ public void testOAuthTokenSentAsBearer() throws Exception {
+ enqueueSuccessfulTokenResponse();
+
+ Properties props = createBaseProperties();
+ props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(),
"client_credentials");
+ props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(),
tokenEndpoint.toString());
+ props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.camelName(),
CLIENT_ID);
+ props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.camelName(),
CLIENT_SECRET);
+
+ try (Connection conn = DriverManager.getConnection(getJdbcUrl(), props)) {
+ conn.getMetaData().getCatalogs().close();
+ }
+
+ // Verify the Flight server received the bearer token
+ String authHeader =
+ FLIGHT_SERVER_TEST_EXTENSION
+ .getInterceptorFactory()
+ .getHeader(org.apache.arrow.flight.FlightMethod.GET_FLIGHT_INFO,
"authorization");
+ assertNotNull(authHeader, "Authorization header should be present in
Flight requests");
+ assertEquals("Bearer " + VALID_ACCESS_TOKEN, authHeader);
+ }
+}
diff --git
a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthConfigurationTest.java
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthConfigurationTest.java
new file mode 100644
index 000000000..c258a7c65
--- /dev/null
+++
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthConfigurationTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import com.nimbusds.oauth2.sdk.Scope;
+import java.net.URI;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.Named;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/** Tests for {@link OAuthConfiguration}. */
+public class OAuthConfigurationTest {
+
+ private static final String TOKEN_URI = "https://auth.example.com/token";
+ private static final String CLIENT_ID = "test-client-id";
+ private static final String CLIENT_SECRET = "test-client-secret";
+ private static final String SCOPE = "read write";
+ private static final String SUBJECT_TOKEN = "subject-token-value";
+ public static final String RESOURCE = "https://api.example.com/resource";
+
+ @FunctionalInterface
+ interface BuilderConfigurer {
+ void configure(OAuthConfiguration.Builder builder) throws SQLException;
+ }
+
+ static Stream<Arguments> createFlowCases() {
+ return Stream.of(
+ Arguments.of(
+ Named.of(
+ "string flow", (BuilderConfigurer) builder ->
builder.flow("client_credentials"))),
+ Arguments.of(
+ Named.of(
+ "uppercase string flow",
+ (BuilderConfigurer) builder ->
builder.flow("CLIENT_CREDENTIALS"))));
+ }
+
+ @ParameterizedTest
+ @MethodSource("createFlowCases")
+ public void testCreateFlowConfiguration(BuilderConfigurer flowConfigurer)
throws SQLException {
+ OAuthConfiguration.Builder builder = new OAuthConfiguration.Builder();
+ flowConfigurer.configure(builder);
+ OAuthConfiguration config =
+
builder.tokenUri(TOKEN_URI).clientId(CLIENT_ID).clientSecret(CLIENT_SECRET).build();
+
+ // Verify configuration creates correct provider type
+ OAuthTokenProvider provider = config.createTokenProvider();
+ assertInstanceOf(ClientCredentialsTokenProvider.class, provider);
+ }
+
+ @Test
+ public void testCreateClientCredentialsTokenProvider() throws SQLException {
+ OAuthConfiguration config =
+ new OAuthConfiguration.Builder()
+ .flow("client_credentials")
+ .tokenUri(TOKEN_URI)
+ .clientId(CLIENT_ID)
+ .clientSecret(CLIENT_SECRET)
+ .scope(SCOPE)
+ .build();
+
+ OAuthTokenProvider provider = config.createTokenProvider();
+
+ assertNotNull(provider);
+ assertInstanceOf(ClientCredentialsTokenProvider.class, provider);
+
+ ClientCredentialsTokenProvider ccProvider =
(ClientCredentialsTokenProvider) provider;
+ assertEquals(URI.create(TOKEN_URI), ccProvider.tokenUri);
+ assertEquals(CLIENT_ID, ccProvider.clientAuth.getClientID().getValue());
+ assertEquals(Scope.parse(SCOPE), ccProvider.scope);
+ }
+
+ @Test
+ public void testCreateTokenExchangeTokenProviderWithAllOptions() throws
SQLException {
+ String subjectTokenType = "urn:ietf:params:oauth:token-type:access_token";
+ String actorToken = "actor-token-value";
+ String actorTokenType = "urn:ietf:params:oauth:token-type:jwt";
+ String audience = "https://api.example.com";
+ String requestedTokenType =
"urn:ietf:params:oauth:token-type:access_token";
+
+ OAuthConfiguration config =
+ new OAuthConfiguration.Builder()
+ .flow("token_exchange")
+ .tokenUri(TOKEN_URI)
+ .scope(SCOPE)
+ .clientId(CLIENT_ID)
+ .clientSecret(CLIENT_SECRET)
+ .resource(RESOURCE)
+ .subjectToken(SUBJECT_TOKEN)
+ .subjectTokenType(subjectTokenType)
+ .actorToken(actorToken)
+ .actorTokenType(actorTokenType)
+ .audience(audience)
+ .requestedTokenType(requestedTokenType)
+ .build();
+
+ OAuthTokenProvider provider = config.createTokenProvider();
+
+ assertNotNull(provider);
+ assertInstanceOf(TokenExchangeTokenProvider.class, provider);
+
+ TokenExchangeTokenProvider teProvider = (TokenExchangeTokenProvider)
provider;
+ assertEquals(URI.create(TOKEN_URI), teProvider.tokenUri);
+ assertNotNull(teProvider.grant);
+ assertEquals(SUBJECT_TOKEN, teProvider.grant.getSubjectToken().getValue());
+ assertEquals(subjectTokenType,
teProvider.grant.getSubjectTokenType().getURI().toString());
+ assertEquals(actorToken, teProvider.grant.getActorToken().getValue());
+ assertEquals(actorTokenType,
teProvider.grant.getActorTokenType().getURI().toString());
+ assertNotNull(teProvider.grant.getAudience());
+ assertEquals(1, teProvider.grant.getAudience().size());
+ assertEquals(audience, teProvider.grant.getAudience().get(0).getValue());
+ assertEquals(requestedTokenType,
teProvider.grant.getRequestedTokenType().getURI().toString());
+ assertEquals(Scope.parse(SCOPE), teProvider.scope);
+ assertEquals(Collections.singletonList(URI.create(RESOURCE)),
teProvider.resources);
+
+ assertEquals(CLIENT_ID, teProvider.clientAuth.getClientID().getValue());
+ }
+
+ static Stream<Arguments> generalValidationErrorCases() {
+ return Stream.of(
+ Arguments.of(
+ Named.of(
+ "null flow",
+ (BuilderConfigurer) builder -> builder.flow((String)
null).tokenUri(TOKEN_URI)),
+ "OAuth flow cannot be null or empty"),
+ Arguments.of(
+ Named.of(
+ "empty flow", (BuilderConfigurer) builder ->
builder.flow("").tokenUri(TOKEN_URI)),
+ "OAuth flow cannot be null or empty"),
+ Arguments.of(
+ Named.of(
+ "invalid flow",
+ (BuilderConfigurer) builder ->
builder.flow("invalid_flow").tokenUri(TOKEN_URI)),
+ "Unsupported OAuth flow: invalid_flow"),
+ Arguments.of(
+ Named.of(
+ "null tokenUri",
+ (BuilderConfigurer)
+ builder ->
+ builder
+ .flow("client_credentials")
+ .tokenUri((String) null)
+ .clientId(CLIENT_ID)
+ .clientSecret(CLIENT_SECRET)),
+ "Token URI cannot be null or empty"),
+ Arguments.of(
+ Named.of(
+ "empty tokenUri",
+ (BuilderConfigurer)
+ builder ->
+ builder
+ .flow("client_credentials")
+ .tokenUri("")
+ .clientId(CLIENT_ID)
+ .clientSecret(CLIENT_SECRET)),
+ "Token URI cannot be null or empty"),
+ Arguments.of(
+ Named.of(
+ "invalid tokenUri",
+ (BuilderConfigurer)
+ builder ->
+ builder
+ .flow("client_credentials")
+ .tokenUri("not a valid uri ://")
+ .clientId(CLIENT_ID)
+ .clientSecret(CLIENT_SECRET)),
+ null),
+ Arguments.of(
+ Named.of(
+ "invalid tokenUri",
+ (BuilderConfigurer)
+ builder ->
+
builder.flow("client_credentials").tokenUri(TOKEN_URI).clientId(CLIENT_ID)),
+ // null means verify exception has message and cause
+ "clientSecret is required for client_credentials flow"));
+ }
+
+ @ParameterizedTest
+ @MethodSource("generalValidationErrorCases")
+ public void testGeneralValidationErrors(BuilderConfigurer configurer, String
expectedMessage) {
+ SQLException exception =
+ assertThrows(
+ SQLException.class,
+ () -> {
+ OAuthConfiguration.Builder builder = new
OAuthConfiguration.Builder();
+ configurer.configure(builder);
+ builder.build();
+ });
+
+ if (expectedMessage != null) {
+ assertEquals(expectedMessage, exception.getMessage());
+ } else {
+ assertNotNull(exception.getMessage());
+ assertNotNull(exception.getCause());
+ }
+ }
+
+ static Stream<Arguments> flowSpecificValidationErrorCases() {
+ return Stream.of(
+ // client_credentials flow validation
+ Arguments.of(
+ Named.of(
+ "client_credentials: missing clientId",
+ (BuilderConfigurer)
+ builder ->
+ builder
+ .flow("client_credentials")
+ .tokenUri(TOKEN_URI)
+ .clientSecret(CLIENT_SECRET)),
+ "clientId is required for client_credentials flow"),
+ Arguments.of(
+ Named.of(
+ "client_credentials: missing clientSecret",
+ (BuilderConfigurer)
+ builder ->
+
builder.flow("client_credentials").tokenUri(TOKEN_URI).clientId(CLIENT_ID)),
+ "clientSecret is required for client_credentials flow"),
+ // token_exchange flow validation
+ Arguments.of(
+ Named.of(
+ "token_exchange: missing subjectToken",
+ (BuilderConfigurer) builder ->
builder.flow("token_exchange").tokenUri(TOKEN_URI)),
+ "subjectToken is required for token_exchange flow"),
+ Arguments.of(
+ Named.of(
+ "token_exchange: empty subjectToken",
+ (BuilderConfigurer)
+ builder ->
+ builder
+ .flow("token_exchange")
+ .tokenUri(TOKEN_URI)
+ .subjectToken("")
+
.subjectTokenType("urn:ietf:params:oauth:token-type:access_token")),
+ "subjectToken is required for token_exchange flow"),
+ Arguments.of(
+ Named.of(
+ "token_exchange: missing subjectTokenType",
+ (BuilderConfigurer)
+ builder ->
+ builder
+ .flow("token_exchange")
+ .tokenUri(TOKEN_URI)
+ .subjectToken(SUBJECT_TOKEN)),
+ "subjectTokenType is required for token_exchange flow"),
+ Arguments.of(
+ Named.of(
+ "token_exchange: empty subjectTokenType",
+ (BuilderConfigurer)
+ builder ->
+ builder
+ .flow("token_exchange")
+ .tokenUri(TOKEN_URI)
+ .subjectToken(SUBJECT_TOKEN)
+ .subjectTokenType("")),
+ "subjectTokenType is required for token_exchange flow"));
+ }
+
+ @ParameterizedTest
+ @MethodSource("flowSpecificValidationErrorCases")
+ public void testFlowSpecificValidationErrors(
+ BuilderConfigurer configurer, String expectedMessage) {
+ SQLException exception =
+ assertThrows(
+ SQLException.class,
+ () -> {
+ OAuthConfiguration.Builder builder = new
OAuthConfiguration.Builder();
+ configurer.configure(builder);
+ builder.build();
+ });
+
+ assertEquals(expectedMessage, exception.getMessage());
+ }
+}
diff --git
a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthCredentialWriterTest.java
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthCredentialWriterTest.java
new file mode 100644
index 000000000..1a33f7f0a
--- /dev/null
+++
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthCredentialWriterTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.SQLException;
+import org.apache.arrow.flight.CallHeaders;
+import org.apache.arrow.flight.FlightCallHeaders;
+import org.apache.arrow.flight.auth2.Auth2Constants;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/** Tests for {@link OAuthCredentialWriter}. */
+@ExtendWith(MockitoExtension.class)
+public class OAuthCredentialWriterTest {
+
+ @Mock private OAuthTokenProvider mockTokenProvider;
+
+ @Test
+ public void testConstructorRejectsNullTokenProvider() {
+ assertThrows(NullPointerException.class, () -> new
OAuthCredentialWriter(null));
+ }
+
+ @Test
+ public void testAcceptWritesBearerTokenToHeaders() throws SQLException {
+ String testToken = "test-access-token-12345";
+ when(mockTokenProvider.getValidToken()).thenReturn(testToken);
+
+ OAuthCredentialWriter writer = new
OAuthCredentialWriter(mockTokenProvider);
+ CallHeaders headers = new FlightCallHeaders();
+
+ writer.accept(headers);
+
+ verify(mockTokenProvider).getValidToken();
+ assertEquals(
+ Auth2Constants.BEARER_PREFIX + testToken,
headers.get(Auth2Constants.AUTHORIZATION_HEADER));
+ }
+
+ @Test
+ public void testAcceptThrowsOAuthTokenExceptionOnSQLException() throws
SQLException {
+ SQLException sqlException = new SQLException("Token fetch failed");
+ when(mockTokenProvider.getValidToken()).thenThrow(sqlException);
+
+ OAuthCredentialWriter writer = new
OAuthCredentialWriter(mockTokenProvider);
+ CallHeaders headers = new FlightCallHeaders();
+
+ OAuthTokenException exception =
+ assertThrows(OAuthTokenException.class, () -> writer.accept(headers));
+
+ assertEquals("Failed to obtain OAuth token", exception.getMessage());
+ assertEquals(sqlException, exception.getCause());
+ }
+
+ @Test
+ public void testAcceptCallsTokenProviderEachTime() throws SQLException {
+ when(mockTokenProvider.getValidToken())
+ .thenReturn("token1")
+ .thenReturn("token2")
+ .thenReturn("token3");
+
+ OAuthCredentialWriter writer = new
OAuthCredentialWriter(mockTokenProvider);
+
+ CallHeaders headers1 = new FlightCallHeaders();
+ writer.accept(headers1);
+ assertEquals("Bearer token1",
headers1.get(Auth2Constants.AUTHORIZATION_HEADER));
+
+ CallHeaders headers2 = new FlightCallHeaders();
+ writer.accept(headers2);
+ assertEquals("Bearer token2",
headers2.get(Auth2Constants.AUTHORIZATION_HEADER));
+
+ CallHeaders headers3 = new FlightCallHeaders();
+ writer.accept(headers3);
+ assertEquals("Bearer token3",
headers3.get(Auth2Constants.AUTHORIZATION_HEADER));
+ }
+}
diff --git a/flight/flight-sql-jdbc-driver/src/shade/LICENSE.txt
b/flight/flight-sql-jdbc-driver/src/shade/LICENSE.txt
index 8bc43cbe0..8476bd999 100644
--- a/flight/flight-sql-jdbc-driver/src/shade/LICENSE.txt
+++ b/flight/flight-sql-jdbc-driver/src/shade/LICENSE.txt
@@ -345,6 +345,14 @@ License: https://www.apache.org/licenses/LICENSE-2.0
--------------------------------------------------------------------------------
+This binary artifact contains Nimbus OAuth 2.0 SDK with OpenID Connect
extensions 11.20.1.
+
+Copyright: Copyright 2012-2024 Connect2id Ltd.
+Home page: https://connect2id.com/products/nimbus-oauth-openid-connect-sdk
+License: https://www.apache.org/licenses/LICENSE-2.0
+
+--------------------------------------------------------------------------------
+
This binary artifact contains Bouncycastle 1.80.
Copyright: Copyright (c) 2000-2024 The Legion of the Bouncy Castle Inc.
(https://www.bouncycastle.org).
diff --git
a/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ITDriverJarValidation.java
b/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ITDriverJarValidation.java
index c1bd111fb..145744ad3 100644
---
a/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ITDriverJarValidation.java
+++
b/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ITDriverJarValidation.java
@@ -70,6 +70,10 @@ public class ITDriverJarValidation {
"LICENSE.txt",
"NOTICE.txt",
"arrow-git.properties",
+ "iso3166_1alpha2-codes.properties",
+ "iso3166_1alpha3-codes.properties",
+ "iso3166_1alpha-2-3-map.properties",
+ "iso3166_3-codes.properties",
"properties/flight.properties",
"META-INF/io.netty.versions.properties",
"META-INF/MANIFEST.MF",