This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-java.git


The following commit(s) were added to refs/heads/main by this push:
     new 44c49baf6 GH-952: Add OAuth support (#953)
44c49baf6 is described below

commit 44c49baf6c2fdfcf20c8611f45c627c9438b2adb
Author: Hélder Gregório <[email protected]>
AuthorDate: Thu Jan 22 14:19:58 2026 +0000

    GH-952: Add OAuth support (#953)
    
    ## What's Changed
    
    - Add OAuth 2.0 support to the Flight SQL JDBC driver, including client
    credentials and token exchange flows
    - Integrate OAuth token acquisition into connection setup, wiring tokens
    through OAuthCredentialWriter and updating gRPC credential handling to
    fail fast on writer errors.
    - Document new OAuth connection properties and add example clients for
    both OAuth flows.
    - Connection Properties Added
      - oauth.flow
      - oauth.tokenUri
      - oauth.clientId
      - oauth.clientSecret
      - oauth.scope
      - oauth.resource
      - oauth.exchange.subjectToken
      - oauth.exchange.subjectTokenType
      - oauth.exchange.actorToken
      - oauth.exchange.actorTokenType
      - oauth.exchange.aud
      - oauth.exchange.requestedTokenType
    - Connection config now recognizes oauth.* and oauth.exchange.*
    properties and builds OAuth providers when oauth.flow is specified.
    - Adds com.nimbusds:oauth2-oidc-sdk dependency and mockwebserver for
    tests.
    
    
    Closes #952.
---
 docs/source/flight_sql_jdbc_driver.rst             | 123 ++++++
 .../arrow/flight/grpc/CallCredentialAdapter.java   |  12 +-
 flight/flight-sql-jdbc-core/pom.xml                |  32 ++
 .../arrow/driver/jdbc/ArrowFlightConnection.java   |   1 +
 .../jdbc/client/ArrowFlightSqlClientHandler.java   |  23 +-
 .../client/oauth/AbstractOAuthTokenProvider.java   | 108 +++++
 .../oauth/ClientCredentialsTokenProvider.java      |  58 +++
 .../jdbc/client/oauth/OAuthConfiguration.java      | 240 +++++++++++
 .../jdbc/client/oauth/OAuthCredentialWriter.java   |  42 ++
 .../jdbc/client/oauth/OAuthTokenException.java     |  31 ++
 .../jdbc/client/oauth/OAuthTokenProvider.java      |  33 ++
 .../jdbc/client/oauth/OAuthTokenProviders.java     | 419 ++++++++++++++++++
 .../client/oauth/TokenExchangeTokenProvider.java   |  81 ++++
 .../arrow/driver/jdbc/client/oauth/TokenInfo.java  |  45 ++
 .../utils/ArrowFlightConnectionConfigImpl.java     |  52 +++
 .../arrow/driver/jdbc/OAuthIntegrationTest.java    | 474 +++++++++++++++++++++
 .../jdbc/client/oauth/OAuthConfigurationTest.java  | 296 +++++++++++++
 .../client/oauth/OAuthCredentialWriterTest.java    |  95 +++++
 .../flight-sql-jdbc-driver/src/shade/LICENSE.txt   |   8 +
 .../arrow/driver/jdbc/ITDriverJarValidation.java   |   4 +
 20 files changed, 2173 insertions(+), 4 deletions(-)

diff --git a/docs/source/flight_sql_jdbc_driver.rst 
b/docs/source/flight_sql_jdbc_driver.rst
index 180693094..4deb726b3 100644
--- a/docs/source/flight_sql_jdbc_driver.rst
+++ b/docs/source/flight_sql_jdbc_driver.rst
@@ -173,3 +173,126 @@ DriverManager#getConnection()
 
<https://docs.oracle.com/javase/8/docs/api/java/sql/DriverManager.html#getConnection-java.lang.String-java.lang.String-java.lang.String->`_,
 the username and password supplied on the URI supercede the username and
 password arguments to the function call.
+
+OAuth 2.0 Authentication
+========================
+
+The driver supports OAuth 2.0 authentication for obtaining access tokens
+from an authorization server. Two OAuth flows are currently supported:
+
+* **Client Credentials** - For service-to-service authentication where no
+  user interaction is required. The application authenticates using its own
+  credentials (client ID and client secret).
+
+* **Token Exchange** (RFC 8693) - For exchanging one token for another,
+  commonly used for federated authentication, delegation, or impersonation
+  scenarios.
+
+OAuth Connection Properties
+---------------------------
+
+The following properties configure OAuth authentication. These properties
+should be provided via the ``Properties`` object when connecting, as they
+may contain special characters that are difficult to encode in a URI.
+
+**Common OAuth Properties**
+
+.. list-table::
+   :header-rows: 1
+
+   * - Parameter
+     - Type
+     - Required
+     - Default
+     - Description
+
+   * - oauth.flow
+     - String
+     - Yes (to enable OAuth)
+     - null
+     - The OAuth grant type. Supported values: ``client_credentials``,
+       ``token_exchange``
+
+   * - oauth.tokenUri
+     - String
+     - Yes
+     - null
+     - The OAuth 2.0 token endpoint URL (e.g.,
+       ``https://auth.example.com/oauth/token``)
+
+   * - oauth.clientId
+     - String
+     - Conditional
+     - null
+     - The OAuth 2.0 client ID. Required for ``client_credentials`` flow,
+       optional for ``token_exchange``
+
+   * - oauth.clientSecret
+     - String
+     - Conditional
+     - null
+     - The OAuth 2.0 client secret. Required for ``client_credentials`` flow,
+       optional for ``token_exchange``
+
+   * - oauth.scope
+     - String
+     - No
+     - null
+     - Space-separated list of OAuth scopes to request
+
+   * - oauth.resource
+     - String
+     - No
+     - null
+     - The resource indicator for the token request (RFC 8707)
+
+**Token Exchange Properties**
+
+These properties are specific to the ``token_exchange`` flow:
+
+.. list-table::
+   :header-rows: 1
+
+   * - Parameter
+     - Type
+     - Required
+     - Default
+     - Description
+
+   * - oauth.exchange.subjectToken
+     - String
+     - Yes
+     - null
+     - The subject token to exchange (e.g., a JWT from an identity provider)
+
+   * - oauth.exchange.subjectTokenType
+     - String
+     - Yes
+     - null
+     - The token type URI of the subject token. Common values:
+       ``urn:ietf:params:oauth:token-type:access_token``,
+       ``urn:ietf:params:oauth:token-type:jwt``
+
+   * - oauth.exchange.actorToken
+     - String
+     - No
+     - null
+     - The actor token for delegation/impersonation scenarios
+
+   * - oauth.exchange.actorTokenType
+     - String
+     - No
+     - null
+     - The token type URI of the actor token
+
+   * - oauth.exchange.aud
+     - String
+     - No
+     - null
+     - The target audience for the exchanged token
+
+   * - oauth.exchange.requestedTokenType
+     - String
+     - No
+     - null
+     - The desired token type for the exchanged token
diff --git 
a/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/CallCredentialAdapter.java
 
b/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/CallCredentialAdapter.java
index f33e9b2f9..fe81f3fb2 100644
--- 
a/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/CallCredentialAdapter.java
+++ 
b/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/CallCredentialAdapter.java
@@ -18,6 +18,7 @@ package org.apache.arrow.flight.grpc;
 
 import io.grpc.CallCredentials;
 import io.grpc.Metadata;
+import io.grpc.Status;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import org.apache.arrow.flight.CallHeaders;
@@ -36,9 +37,14 @@ public class CallCredentialAdapter extends CallCredentials {
       RequestInfo requestInfo, Executor executor, MetadataApplier 
metadataApplier) {
     executor.execute(
         () -> {
-          final Metadata headers = new Metadata();
-          credentialWriter.accept(new MetadataAdapter(headers));
-          metadataApplier.apply(headers);
+          try {
+            final Metadata headers = new Metadata();
+            credentialWriter.accept(new MetadataAdapter(headers));
+            metadataApplier.apply(headers);
+          } catch (Throwable t) {
+            metadataApplier.fail(
+                
Status.UNAUTHENTICATED.withCause(t).withDescription(t.getMessage()));
+          }
         });
   }
 
diff --git a/flight/flight-sql-jdbc-core/pom.xml 
b/flight/flight-sql-jdbc-core/pom.xml
index bb191ca9e..da00baf32 100644
--- a/flight/flight-sql-jdbc-core/pom.xml
+++ b/flight/flight-sql-jdbc-core/pom.xml
@@ -120,6 +120,31 @@ under the License.
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>com.squareup.okhttp3</groupId>
+      <artifactId>mockwebserver3</artifactId>
+      <version>5.3.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.squareup.okhttp3</groupId>
+      <artifactId>mockwebserver3-junit5</artifactId>
+      <version>5.3.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.squareup.okhttp3</groupId>
+      <artifactId>okhttp-jvm</artifactId>
+      <version>5.3.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.squareup.okio</groupId>
+      <artifactId>okio-jvm</artifactId>
+      <version>3.16.4</version>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-common</artifactId>
@@ -153,6 +178,13 @@ under the License.
       <artifactId>caffeine</artifactId>
       <version>3.2.3</version>
     </dependency>
+
+    <dependency>
+      <groupId>com.nimbusds</groupId>
+      <artifactId>oauth2-oidc-sdk</artifactId>
+      <version>11.20.1</version>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
index f81233ec3..0e9c198f5 100644
--- 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
+++ 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
@@ -122,6 +122,7 @@ public final class ArrowFlightConnection extends 
AvaticaConnection {
           .withClientCache(config.useClientCache() ? new FlightClientCache() : 
null)
           .withConnectTimeout(config.getConnectTimeout())
           .withDriverVersion(driverVersion)
+          .withOAuthConfiguration(config.getOauthConfiguration())
           .build();
     } catch (final SQLException e) {
       try {
diff --git 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
index 666996cd9..f0ea28423 100644
--- 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
+++ 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
@@ -32,6 +32,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import org.apache.arrow.driver.jdbc.client.oauth.OAuthConfiguration;
+import org.apache.arrow.driver.jdbc.client.oauth.OAuthCredentialWriter;
+import org.apache.arrow.driver.jdbc.client.oauth.OAuthTokenProvider;
 import org.apache.arrow.driver.jdbc.client.utils.ClientAuthenticationUtils;
 import org.apache.arrow.driver.jdbc.client.utils.FlightClientCache;
 import org.apache.arrow.driver.jdbc.client.utils.FlightLocationQueue;
@@ -675,6 +678,8 @@ public final class ArrowFlightSqlClientHandler implements 
AutoCloseable {
 
     @VisibleForTesting @Nullable Duration connectTimeout;
 
+    @VisibleForTesting @Nullable OAuthConfiguration oauthConfig;
+
     // These two middleware are for internal use within build() and should not 
be
     // exposed by builder
     // APIs.
@@ -714,6 +719,7 @@ public final class ArrowFlightSqlClientHandler implements 
AutoCloseable {
       this.clientKeyPath = original.clientKeyPath;
       this.allocator = original.allocator;
       this.catalog = original.catalog;
+      this.oauthConfig = original.oauthConfig;
 
       if (original.retainCookies) {
         this.cookieFactory = original.cookieFactory;
@@ -983,6 +989,17 @@ public final class ArrowFlightSqlClientHandler implements 
AutoCloseable {
       return this;
     }
 
+    /**
+     * Sets the OAuth configuration for this handler.
+     *
+     * @param oauthConfig the OAuth configuration
+     * @return this builder instance
+     */
+    public Builder withOAuthConfiguration(final OAuthConfiguration 
oauthConfig) {
+      this.oauthConfig = oauthConfig;
+      return this;
+    }
+
     public String getCacheKey() {
       return getLocation().toString();
     }
@@ -1070,7 +1087,11 @@ public final class ArrowFlightSqlClientHandler 
implements AutoCloseable {
             FlightGrpcUtils.createFlightClient(
                 allocator, channelBuilder.build(), clientBuilder.middleware());
         final ArrayList<CallOption> credentialOptions = new ArrayList<>();
-        if (isUsingUserPasswordAuth) {
+        // Authentication priority: OAuth > token > username/password
+        if (oauthConfig != null) {
+          OAuthTokenProvider tokenProvider = oauthConfig.createTokenProvider();
+          credentialOptions.add(new CredentialCallOption(new 
OAuthCredentialWriter(tokenProvider)));
+        } else if (isUsingUserPasswordAuth) {
           // If the authFactory has already been used for a handshake, use the 
existing
           // token.
           // This can occur if the authFactory is being re-used for a new 
connection
diff --git 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/AbstractOAuthTokenProvider.java
 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/AbstractOAuthTokenProvider.java
new file mode 100644
index 000000000..9c377a585
--- /dev/null
+++ 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/AbstractOAuthTokenProvider.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import com.nimbusds.oauth2.sdk.ParseException;
+import com.nimbusds.oauth2.sdk.Scope;
+import com.nimbusds.oauth2.sdk.TokenErrorResponse;
+import com.nimbusds.oauth2.sdk.TokenRequest;
+import com.nimbusds.oauth2.sdk.TokenResponse;
+import com.nimbusds.oauth2.sdk.auth.ClientAuthentication;
+import com.nimbusds.oauth2.sdk.token.AccessToken;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.SQLException;
+import java.time.Instant;
+import org.apache.arrow.util.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Abstract base class for OAuth token providers that handles token caching, 
refresh logic, and
+ * common request/response handling.
+ */
+public abstract class AbstractOAuthTokenProvider implements OAuthTokenProvider 
{
+  protected static final int EXPIRATION_BUFFER_SECONDS = 30;
+  protected static final int DEFAULT_EXPIRATION_SECONDS = 3600;
+
+  private final Object tokenLock = new Object();
+  private volatile @Nullable TokenInfo cachedToken;
+
+  @VisibleForTesting URI tokenUri;
+
+  @VisibleForTesting @Nullable ClientAuthentication clientAuth;
+
+  @VisibleForTesting @Nullable Scope scope;
+
+  @Override
+  public String getValidToken() throws SQLException {
+    TokenInfo token = cachedToken;
+    if (token != null && !token.isExpired(EXPIRATION_BUFFER_SECONDS)) {
+      return token.getAccessToken();
+    }
+
+    synchronized (tokenLock) {
+      token = cachedToken;
+      if (token != null && !token.isExpired(EXPIRATION_BUFFER_SECONDS)) {
+        return token.getAccessToken();
+      }
+      cachedToken = fetchNewToken();
+      return cachedToken.getAccessToken();
+    }
+  }
+
+  /**
+   * Fetches a new token from the authorization server. This method handles 
the common
+   * request/response logic while delegating flow-specific request building to 
subclasses.
+   *
+   * @return the new token information
+   * @throws SQLException if token cannot be obtained
+   */
+  protected TokenInfo fetchNewToken() throws SQLException {
+    try {
+      TokenRequest request = buildTokenRequest();
+      TokenResponse response = 
TokenResponse.parse(request.toHTTPRequest().send());
+
+      if (!response.indicatesSuccess()) {
+        TokenErrorResponse errorResponse = response.toErrorResponse();
+        String errorMsg =
+            String.format(
+                "OAuth request failed: %s - %s",
+                errorResponse.getErrorObject().getCode(),
+                errorResponse.getErrorObject().getDescription());
+        throw new SQLException(errorMsg);
+      }
+
+      AccessToken accessToken = 
response.toSuccessResponse().getTokens().getAccessToken();
+      long expiresIn =
+          accessToken.getLifetime() > 0 ? accessToken.getLifetime() : 
DEFAULT_EXPIRATION_SECONDS;
+      Instant expiresAt = Instant.now().plusSeconds(expiresIn);
+
+      return new TokenInfo(accessToken.getValue(), expiresAt);
+    } catch (ParseException e) {
+      throw new SQLException("Failed to parse OAuth token response", e);
+    } catch (IOException e) {
+      throw new SQLException("Failed to send OAuth token request", e);
+    }
+  }
+
+  /**
+   * Builds the flow-specific token request.
+   *
+   * @return the token request to send to the authorization server
+   */
+  protected abstract TokenRequest buildTokenRequest();
+}
diff --git 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/ClientCredentialsTokenProvider.java
 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/ClientCredentialsTokenProvider.java
new file mode 100644
index 000000000..7e6289819
--- /dev/null
+++ 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/ClientCredentialsTokenProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import com.nimbusds.oauth2.sdk.ClientCredentialsGrant;
+import com.nimbusds.oauth2.sdk.Scope;
+import com.nimbusds.oauth2.sdk.TokenRequest;
+import com.nimbusds.oauth2.sdk.auth.ClientSecretBasic;
+import com.nimbusds.oauth2.sdk.auth.Secret;
+import com.nimbusds.oauth2.sdk.id.ClientID;
+import java.net.URI;
+import java.util.Objects;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * OAuth 2.0 Client Credentials flow token provider (RFC 6749 Section 4.4).
+ *
+ * <p>This provider handles service-to-service authentication where no user 
interaction is required.
+ * Tokens are cached and automatically refreshed before expiration.
+ */
+public class ClientCredentialsTokenProvider extends AbstractOAuthTokenProvider 
{
+
+  /**
+   * Creates a new ClientCredentialsTokenProvider.
+   *
+   * @param tokenUri the OAuth token endpoint URI
+   * @param clientId the OAuth client ID
+   * @param clientSecret the OAuth client secret
+   * @param scope optional OAuth scopes (space-separated)
+   */
+  ClientCredentialsTokenProvider(
+      URI tokenUri, String clientId, String clientSecret, @Nullable String 
scope) {
+    this.tokenUri = Objects.requireNonNull(tokenUri, "tokenUri cannot be 
null");
+    Objects.requireNonNull(clientId, "clientId cannot be null");
+    Objects.requireNonNull(clientSecret, "clientSecret cannot be null");
+    this.clientAuth = new ClientSecretBasic(new ClientID(clientId), new 
Secret(clientSecret));
+    this.scope = (scope != null && !scope.isEmpty()) ? Scope.parse(scope) : 
null;
+  }
+
+  @Override
+  protected TokenRequest buildTokenRequest() {
+    return new TokenRequest(tokenUri, clientAuth, new 
ClientCredentialsGrant(), scope);
+  }
+}
diff --git 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthConfiguration.java
 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthConfiguration.java
new file mode 100644
index 000000000..cba9d4c2e
--- /dev/null
+++ 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthConfiguration.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import com.nimbusds.oauth2.sdk.GrantType;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.SQLException;
+import java.util.Locale;
+import java.util.Objects;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Configuration class for OAuth settings parsed from connection properties. 
*/
+public class OAuthConfiguration {
+
+  private final GrantType grantType;
+  private final URI tokenUri;
+  private final @Nullable String clientId;
+  private final @Nullable String clientSecret;
+  private final @Nullable String scope;
+  private final @Nullable String subjectToken;
+  private final @Nullable String subjectTokenType;
+  private final @Nullable String actorToken;
+  private final @Nullable String actorTokenType;
+  private final @Nullable String audience;
+  private final @Nullable String resource;
+  private final @Nullable String requestedTokenType;
+
+  private OAuthConfiguration(Builder builder) throws SQLException {
+    this.grantType = builder.grantType;
+    this.tokenUri = builder.tokenUri;
+    this.clientId = builder.clientId;
+    this.clientSecret = builder.clientSecret;
+    this.scope = builder.scope;
+    this.subjectToken = builder.subjectToken;
+    this.subjectTokenType = builder.subjectTokenType;
+    this.actorToken = builder.actorToken;
+    this.actorTokenType = builder.actorTokenType;
+    this.audience = builder.audience;
+    this.resource = builder.resource;
+    this.requestedTokenType = builder.requestedTokenType;
+
+    validate();
+  }
+
+  private void validate() throws SQLException {
+    Objects.requireNonNull(grantType, "OAuth grant type is required");
+    Objects.requireNonNull(tokenUri, "Token URI is required");
+
+    if (GrantType.CLIENT_CREDENTIALS.equals(grantType)) {
+      if (clientId == null || clientId.isEmpty()) {
+        throw new SQLException("clientId is required for client_credentials 
flow");
+      }
+      if (clientSecret == null || clientSecret.isEmpty()) {
+        throw new SQLException("clientSecret is required for 
client_credentials flow");
+      }
+    } else if (GrantType.TOKEN_EXCHANGE.equals(grantType)) {
+      if (subjectToken == null || subjectToken.isEmpty()) {
+        throw new SQLException("subjectToken is required for token_exchange 
flow");
+      }
+      if (subjectTokenType == null || subjectTokenType.isEmpty()) {
+        throw new SQLException("subjectTokenType is required for 
token_exchange flow");
+      }
+    } else {
+      throw new SQLException("Unsupported OAuth grant type: " + grantType);
+    }
+  }
+
+  /**
+   * Creates an OAuthTokenProvider based on the configured grant type.
+   *
+   * @return the token provider
+   * @throws SQLException if the grant type is not supported or configuration 
is invalid
+   */
+  public OAuthTokenProvider createTokenProvider() throws SQLException {
+    if (GrantType.CLIENT_CREDENTIALS.equals(grantType)) {
+      return OAuthTokenProviders.clientCredentials()
+          .tokenUri(tokenUri)
+          .clientId(clientId)
+          .clientSecret(clientSecret)
+          .scope(scope)
+          .build();
+    } else if (GrantType.TOKEN_EXCHANGE.equals(grantType)) {
+      OAuthTokenProviders.TokenExchangeBuilder builder =
+          OAuthTokenProviders.tokenExchange()
+              .tokenUri(tokenUri)
+              .subjectToken(subjectToken)
+              .subjectTokenType(subjectTokenType)
+              .actorToken(actorToken)
+              .actorTokenType(actorTokenType)
+              .audience(audience)
+              .requestedTokenType(requestedTokenType)
+              .scope(scope)
+              .resource(resource);
+
+      if (clientId != null && clientSecret != null) {
+        builder.clientCredentials(clientId, clientSecret);
+      }
+
+      return builder.build();
+    } else {
+      throw new SQLException("Unsupported OAuth grant type: " + grantType);
+    }
+  }
+
+  /** Builder for OAuthConfiguration. */
+  public static class Builder {
+    private GrantType grantType;
+    private URI tokenUri;
+    private @Nullable String clientId;
+    private @Nullable String clientSecret;
+    private @Nullable String scope;
+    private @Nullable String subjectToken;
+    private @Nullable String subjectTokenType;
+    private @Nullable String actorToken;
+    private @Nullable String actorTokenType;
+    private @Nullable String audience;
+    private @Nullable String resource;
+    private @Nullable String requestedTokenType;
+
+    /**
+     * Sets the OAuth grant type from a string value.
+     *
+     * <p>Accepts either user-friendly names ("client_credentials", 
"token_exchange") or the full
+     * URN format as defined in RFC 6749 and RFC 8693.
+     *
+     * @param flowStr the flow type string (e.g., "client_credentials", 
"token_exchange")
+     * @return this builder
+     * @throws SQLException if the flow string is invalid
+     */
+    public Builder flow(String flowStr) throws SQLException {
+      if (flowStr == null || flowStr.isEmpty()) {
+        throw new SQLException("OAuth flow cannot be null or empty");
+      }
+      try {
+        String normalized = flowStr.toLowerCase(Locale.ROOT);
+        // Map user-friendly names to URN format for token_exchange
+        if ("token_exchange".equals(normalized)) {
+          normalized = GrantType.TOKEN_EXCHANGE.getValue();
+        }
+        GrantType parsed = GrantType.parse(normalized);
+        if (!parsed.equals(GrantType.CLIENT_CREDENTIALS)
+            && !parsed.equals(GrantType.TOKEN_EXCHANGE)) {
+          throw new SQLException("Unsupported OAuth flow: " + flowStr);
+        }
+        this.grantType = parsed;
+      } catch (com.nimbusds.oauth2.sdk.ParseException e) {
+        throw new SQLException("Invalid OAuth flow: " + flowStr, e);
+      }
+      return this;
+    }
+
+    /**
+     * Sets the token URI.
+     *
+     * @param tokenUri the OAuth token endpoint URI
+     * @return this builder
+     * @throws SQLException if the URI is invalid
+     */
+    public Builder tokenUri(String tokenUri) throws SQLException {
+      if (tokenUri == null || tokenUri.isEmpty()) {
+        throw new SQLException("Token URI cannot be null or empty");
+      }
+      try {
+        this.tokenUri = new URI(tokenUri);
+      } catch (URISyntaxException e) {
+        throw new SQLException("Invalid token URI: " + tokenUri, e);
+      }
+      return this;
+    }
+
+    public Builder clientId(@Nullable String clientId) {
+      this.clientId = clientId;
+      return this;
+    }
+
+    public Builder clientSecret(@Nullable String clientSecret) {
+      this.clientSecret = clientSecret;
+      return this;
+    }
+
+    public Builder scope(@Nullable String scope) {
+      this.scope = scope;
+      return this;
+    }
+
+    public Builder subjectToken(@Nullable String subjectToken) {
+      this.subjectToken = subjectToken;
+      return this;
+    }
+
+    public Builder subjectTokenType(@Nullable String subjectTokenType) {
+      this.subjectTokenType = subjectTokenType;
+      return this;
+    }
+
+    public Builder actorToken(@Nullable String actorToken) {
+      this.actorToken = actorToken;
+      return this;
+    }
+
+    public Builder actorTokenType(@Nullable String actorTokenType) {
+      this.actorTokenType = actorTokenType;
+      return this;
+    }
+
+    public Builder audience(@Nullable String audience) {
+      this.audience = audience;
+      return this;
+    }
+
+    public Builder resource(@Nullable String resource) {
+      this.resource = resource;
+      return this;
+    }
+
+    public Builder requestedTokenType(@Nullable String requestedTokenType) {
+      this.requestedTokenType = requestedTokenType;
+      return this;
+    }
+
+    public OAuthConfiguration build() throws SQLException {
+      return new OAuthConfiguration(this);
+    }
+  }
+}
diff --git 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthCredentialWriter.java
 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthCredentialWriter.java
new file mode 100644
index 000000000..0d4ad4689
--- /dev/null
+++ 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthCredentialWriter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import java.sql.SQLException;
+import java.util.Objects;
+import java.util.function.Consumer;
+import org.apache.arrow.flight.CallHeaders;
+import org.apache.arrow.flight.auth2.Auth2Constants;
+
+/** Writes OAuth bearer tokens to Flight call headers. */
+public class OAuthCredentialWriter implements Consumer<CallHeaders> {
+  private final OAuthTokenProvider tokenProvider;
+
+  public OAuthCredentialWriter(OAuthTokenProvider tokenProvider) {
+    this.tokenProvider = Objects.requireNonNull(tokenProvider, "tokenProvider 
cannot be null");
+  }
+
+  @Override
+  public void accept(CallHeaders headers) {
+    try {
+      String token = tokenProvider.getValidToken();
+      headers.insert(Auth2Constants.AUTHORIZATION_HEADER, 
Auth2Constants.BEARER_PREFIX + token);
+    } catch (SQLException e) {
+      throw new OAuthTokenException("Failed to obtain OAuth token", e);
+    }
+  }
+}
diff --git 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenException.java
 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenException.java
new file mode 100644
index 000000000..aceadb327
--- /dev/null
+++ 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+/**
+ * Runtime exception thrown when OAuth token operations fail. Used to wrap 
checked exceptions in
+ * contexts that don't allow them.
+ */
+public class OAuthTokenException extends RuntimeException {
+  public OAuthTokenException(String message) {
+    super(message);
+  }
+
+  public OAuthTokenException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenProvider.java
 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenProvider.java
new file mode 100644
index 000000000..241611e43
--- /dev/null
+++ 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import java.sql.SQLException;
+
+/**
+ * Interface for OAuth token providers that handle token acquisition and 
refresh. Implementations
+ * should cache tokens and automatically refresh them before expiration.
+ */
+public interface OAuthTokenProvider {
+  /**
+   * Gets a valid OAuth access token, refreshing if necessary.
+   *
+   * @return a valid access token string
+   * @throws SQLException if token cannot be obtained
+   */
+  String getValidToken() throws SQLException;
+}
diff --git 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenProviders.java
 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenProviders.java
new file mode 100644
index 000000000..bbf7072d3
--- /dev/null
+++ 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthTokenProviders.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import com.nimbusds.oauth2.sdk.ParseException;
+import com.nimbusds.oauth2.sdk.Scope;
+import com.nimbusds.oauth2.sdk.auth.ClientAuthentication;
+import com.nimbusds.oauth2.sdk.auth.ClientSecretBasic;
+import com.nimbusds.oauth2.sdk.auth.Secret;
+import com.nimbusds.oauth2.sdk.id.Audience;
+import com.nimbusds.oauth2.sdk.id.ClientID;
+import com.nimbusds.oauth2.sdk.token.TokenTypeURI;
+import com.nimbusds.oauth2.sdk.token.TypelessAccessToken;
+import com.nimbusds.oauth2.sdk.tokenexchange.TokenExchangeGrant;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Unified factory for creating OAuth token providers.
+ *
+ * <p>This class provides a single entry point for creating all OAuth token 
providers with a
+ * consistent builder API. It supports:
+ *
+ * <ul>
+ *   <li>Client Credentials flow (RFC 6749 Section 4.4)
+ *   <li>Token Exchange flow (RFC 8693)
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * // Client Credentials flow
+ * OAuthTokenProvider provider = OAuthTokenProviders.clientCredentials()
+ *     .tokenUri("https://auth.example.com/token";)
+ *     .clientId("my-client")
+ *     .clientSecret("my-secret")
+ *     .scope("read write")
+ *     .build();
+ *
+ * // Token Exchange flow
+ * OAuthTokenProvider provider = OAuthTokenProviders.tokenExchange()
+ *     .tokenUri("https://auth.example.com/token";)
+ *     .subjectToken("user-token")
+ *     .subjectTokenType("urn:ietf:params:oauth:token-type:access_token")
+ *     .build();
+ * }</pre>
+ */
+public final class OAuthTokenProviders {
+
+  private OAuthTokenProviders() {}
+
+  /**
+   * Creates a new builder for Client Credentials flow.
+   *
+   * @return a new ClientCredentialsBuilder instance
+   */
+  public static ClientCredentialsBuilder clientCredentials() {
+    return new ClientCredentialsBuilder();
+  }
+
+  /**
+   * Creates a new builder for Token Exchange flow.
+   *
+   * @return a new TokenExchangeBuilder instance
+   */
+  public static TokenExchangeBuilder tokenExchange() {
+    return new TokenExchangeBuilder();
+  }
+
+  /** Builder for creating {@link ClientCredentialsTokenProvider} instances. */
+  public static class ClientCredentialsBuilder {
+    private @Nullable URI tokenUri;
+    private @Nullable String clientId;
+    private @Nullable String clientSecret;
+    private @Nullable String scope;
+
+    ClientCredentialsBuilder() {}
+
+    /**
+     * Sets the OAuth token endpoint URI (required).
+     *
+     * @param tokenUri the token endpoint URI
+     * @return this builder
+     */
+    public ClientCredentialsBuilder tokenUri(URI tokenUri) {
+      this.tokenUri = Objects.requireNonNull(tokenUri, "tokenUri cannot be 
null");
+      return this;
+    }
+
+    /**
+     * Sets the OAuth token endpoint URI from a string (required).
+     *
+     * @param tokenUri the token endpoint URI string
+     * @return this builder
+     * @throws IllegalArgumentException if the URI is invalid
+     */
+    public ClientCredentialsBuilder tokenUri(String tokenUri) {
+      Objects.requireNonNull(tokenUri, "tokenUri cannot be null");
+      try {
+        this.tokenUri = new URI(tokenUri);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException("Invalid token URI: " + tokenUri, 
e);
+      }
+      return this;
+    }
+
+    /**
+     * Sets the OAuth client ID (required).
+     *
+     * @param clientId the client ID
+     * @return this builder
+     */
+    public ClientCredentialsBuilder clientId(String clientId) {
+      this.clientId = Objects.requireNonNull(clientId, "clientId cannot be 
null");
+      return this;
+    }
+
+    /**
+     * Sets the OAuth client secret (required).
+     *
+     * @param clientSecret the client secret
+     * @return this builder
+     */
+    public ClientCredentialsBuilder clientSecret(String clientSecret) {
+      this.clientSecret = Objects.requireNonNull(clientSecret, "clientSecret 
cannot be null");
+      return this;
+    }
+
+    /**
+     * Sets the OAuth scopes (optional).
+     *
+     * @param scope the space-separated scope string
+     * @return this builder
+     */
+    public ClientCredentialsBuilder scope(@Nullable String scope) {
+      this.scope = scope;
+      return this;
+    }
+
+    /**
+     * Builds a new ClientCredentialsTokenProvider instance.
+     *
+     * @return the configured ClientCredentialsTokenProvider
+     * @throws IllegalStateException if required parameters are missing
+     */
+    public ClientCredentialsTokenProvider build() {
+      if (tokenUri == null) {
+        throw new IllegalStateException("tokenUri is required");
+      }
+      if (clientId == null) {
+        throw new IllegalStateException("clientId is required");
+      }
+      if (clientSecret == null) {
+        throw new IllegalStateException("clientSecret is required");
+      }
+      return new ClientCredentialsTokenProvider(tokenUri, clientId, 
clientSecret, scope);
+    }
+  }
+
+  /** Builder for creating {@link TokenExchangeTokenProvider} instances. */
+  public static class TokenExchangeBuilder {
+    private @Nullable URI tokenUri;
+    private @Nullable String subjectToken;
+    private @Nullable String subjectTokenType;
+    private @Nullable String actorToken;
+    private @Nullable String actorTokenType;
+    private @Nullable String audience;
+    private @Nullable String requestedTokenType;
+    private @Nullable Scope scope;
+    private @Nullable List<URI> resources;
+    private @Nullable ClientAuthentication clientAuth;
+
+    TokenExchangeBuilder() {}
+
+    /**
+     * Sets the OAuth token endpoint URI (required).
+     *
+     * @param tokenUri the token endpoint URI
+     * @return this builder
+     */
+    public TokenExchangeBuilder tokenUri(URI tokenUri) {
+      this.tokenUri = Objects.requireNonNull(tokenUri, "tokenUri cannot be 
null");
+      return this;
+    }
+
+    /**
+     * Sets the OAuth token endpoint URI from a string (required).
+     *
+     * @param tokenUri the token endpoint URI string
+     * @return this builder
+     * @throws IllegalArgumentException if the URI is invalid
+     */
+    public TokenExchangeBuilder tokenUri(String tokenUri) {
+      Objects.requireNonNull(tokenUri, "tokenUri cannot be null");
+      try {
+        this.tokenUri = new URI(tokenUri);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException("Invalid token URI: " + tokenUri, 
e);
+      }
+      return this;
+    }
+
+    /**
+     * Sets the subject token to exchange (required).
+     *
+     * @param subjectToken the subject token value
+     * @return this builder
+     */
+    public TokenExchangeBuilder subjectToken(String subjectToken) {
+      this.subjectToken = Objects.requireNonNull(subjectToken, "subjectToken 
cannot be null");
+      return this;
+    }
+
+    /**
+     * Sets the type of the subject token (required).
+     *
+     * @param subjectTokenType the subject token type URI
+     * @return this builder
+     */
+    public TokenExchangeBuilder subjectTokenType(String subjectTokenType) {
+      this.subjectTokenType =
+          Objects.requireNonNull(subjectTokenType, "subjectTokenType cannot be 
null");
+      return this;
+    }
+
+    /**
+     * Sets the optional actor token for delegation scenarios.
+     *
+     * @param actorToken the actor token value
+     * @return this builder
+     */
+    public TokenExchangeBuilder actorToken(@Nullable String actorToken) {
+      this.actorToken = actorToken;
+      return this;
+    }
+
+    /**
+     * Sets the type of the actor token.
+     *
+     * @param actorTokenType the actor token type URI
+     * @return this builder
+     */
+    public TokenExchangeBuilder actorTokenType(@Nullable String 
actorTokenType) {
+      this.actorTokenType = actorTokenType;
+      return this;
+    }
+
+    /**
+     * Sets the target audience for the exchanged token.
+     *
+     * @param audience the target audience
+     * @return this builder
+     */
+    public TokenExchangeBuilder audience(@Nullable String audience) {
+      this.audience = audience;
+      return this;
+    }
+
+    /**
+     * Sets the requested token type for the exchanged token.
+     *
+     * @param requestedTokenType the requested token type URI
+     * @return this builder
+     */
+    public TokenExchangeBuilder requestedTokenType(@Nullable String 
requestedTokenType) {
+      this.requestedTokenType = requestedTokenType;
+      return this;
+    }
+
+    /**
+     * Sets the OAuth scopes for the token request.
+     *
+     * @param scope the OAuth scope object
+     * @return this builder
+     */
+    public TokenExchangeBuilder scope(@Nullable Scope scope) {
+      this.scope = scope;
+      return this;
+    }
+
+    /**
+     * Sets the OAuth scopes from a space-separated string.
+     *
+     * @param scope the space-separated scope string
+     * @return this builder
+     */
+    public TokenExchangeBuilder scope(@Nullable String scope) {
+      this.scope = (scope != null && !scope.isEmpty()) ? Scope.parse(scope) : 
null;
+      return this;
+    }
+
+    /**
+     * Sets the target resource URIs (RFC 8707).
+     *
+     * @param resources the list of resource URIs
+     * @return this builder
+     */
+    public TokenExchangeBuilder resources(@Nullable List<URI> resources) {
+      this.resources = resources;
+      return this;
+    }
+
+    /**
+     * Sets a single target resource URI (RFC 8707).
+     *
+     * @param resource the resource URI
+     * @return this builder
+     */
+    public TokenExchangeBuilder resource(@Nullable URI resource) {
+      this.resources = resource != null ? Collections.singletonList(resource) 
: null;
+      return this;
+    }
+
+    /**
+     * Sets a single target resource URI from a string (RFC 8707).
+     *
+     * @param resource the resource URI string
+     * @return this builder
+     */
+    public TokenExchangeBuilder resource(@Nullable String resource) {
+      if (resource != null && !resource.isEmpty()) {
+        this.resources = Collections.singletonList(URI.create(resource));
+      } else {
+        this.resources = null;
+      }
+      return this;
+    }
+
+    /**
+     * Sets the client authentication.
+     *
+     * @param clientAuth the client authentication object
+     * @return this builder
+     */
+    public TokenExchangeBuilder clientAuthentication(@Nullable 
ClientAuthentication clientAuth) {
+      this.clientAuth = clientAuth;
+      return this;
+    }
+
+    /**
+     * Sets client authentication using client ID and secret.
+     *
+     * @param clientId the client ID
+     * @param clientSecret the client secret
+     * @return this builder
+     */
+    public TokenExchangeBuilder clientCredentials(String clientId, String 
clientSecret) {
+      Objects.requireNonNull(clientId, "clientId cannot be null");
+      Objects.requireNonNull(clientSecret, "clientSecret cannot be null");
+      this.clientAuth = new ClientSecretBasic(new ClientID(clientId), new 
Secret(clientSecret));
+      return this;
+    }
+
+    /**
+     * Builds a new TokenExchangeTokenProvider instance.
+     *
+     * @return the configured TokenExchangeTokenProvider
+     * @throws IllegalStateException if required parameters are missing
+     */
+    public TokenExchangeTokenProvider build() {
+      if (tokenUri == null) {
+        throw new IllegalStateException("tokenUri is required");
+      }
+      if (subjectToken == null) {
+        throw new IllegalStateException("subjectToken is required");
+      }
+      if (subjectTokenType == null) {
+        throw new IllegalStateException("subjectTokenType is required");
+      }
+
+      TokenExchangeGrant grant = createGrant();
+      return new TokenExchangeTokenProvider(tokenUri, grant, clientAuth, 
scope, resources);
+    }
+
+    private TokenExchangeGrant createGrant() {
+      try {
+        TypelessAccessToken subjectAccessToken = new 
TypelessAccessToken(subjectToken);
+        TokenTypeURI subjectTypeUri = TokenTypeURI.parse(subjectTokenType);
+
+        TypelessAccessToken actorAccessToken =
+            actorToken != null ? new TypelessAccessToken(actorToken) : null;
+        TokenTypeURI actorTypeUri =
+            actorTokenType != null ? TokenTypeURI.parse(actorTokenType) : null;
+        TokenTypeURI requestedTypeUri =
+            requestedTokenType != null ? 
TokenTypeURI.parse(requestedTokenType) : null;
+        List<Audience> audienceList =
+            audience != null ? Collections.singletonList(new 
Audience(audience)) : null;
+
+        return new TokenExchangeGrant(
+            subjectAccessToken,
+            subjectTypeUri,
+            actorAccessToken,
+            actorTypeUri,
+            requestedTypeUri,
+            audienceList);
+      } catch (ParseException e) {
+        throw new IllegalStateException("Failed to create TokenExchangeGrant", 
e);
+      }
+    }
+  }
+}
diff --git 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/TokenExchangeTokenProvider.java
 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/TokenExchangeTokenProvider.java
new file mode 100644
index 000000000..af433a271
--- /dev/null
+++ 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/TokenExchangeTokenProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import com.nimbusds.oauth2.sdk.Scope;
+import com.nimbusds.oauth2.sdk.TokenRequest;
+import com.nimbusds.oauth2.sdk.auth.ClientAuthentication;
+import com.nimbusds.oauth2.sdk.tokenexchange.TokenExchangeGrant;
+import java.net.URI;
+import java.util.List;
+import java.util.Objects;
+import org.apache.arrow.util.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * OAuth 2.0 Token Exchange flow token provider (RFC 8693).
+ *
+ * <p>This provider exchanges one token for another, commonly used for 
federated authentication,
+ * delegation, or impersonation scenarios. Tokens are cached and automatically 
refreshed.
+ */
+public class TokenExchangeTokenProvider extends AbstractOAuthTokenProvider {
+
+  @VisibleForTesting TokenExchangeGrant grant;
+
+  @VisibleForTesting @Nullable List<URI> resources;
+
+  /**
+   * Creates a new TokenExchangeTokenProvider with full configuration.
+   *
+   * @param tokenUri the OAuth token endpoint URI
+   * @param grant the token exchange grant containing subject/actor token 
information
+   * @param clientAuth optional client authentication
+   * @param scope optional OAuth scopes
+   * @param resource optional target resource URI (RFC 8707)
+   */
+  TokenExchangeTokenProvider(
+      URI tokenUri,
+      TokenExchangeGrant grant,
+      @Nullable ClientAuthentication clientAuth,
+      @Nullable Scope scope,
+      @Nullable List<URI> resource) {
+    this.tokenUri = Objects.requireNonNull(tokenUri, "tokenUri cannot be 
null");
+    this.grant = Objects.requireNonNull(grant, "grant cannot be null");
+    this.scope = scope;
+    this.resources = resource;
+    this.clientAuth = clientAuth;
+  }
+
+  @Override
+  protected TokenRequest buildTokenRequest() {
+    TokenRequest.Builder builder;
+    if (clientAuth != null) {
+      builder = new TokenRequest.Builder(tokenUri, clientAuth, grant);
+    } else {
+      builder = new TokenRequest.Builder(tokenUri, grant);
+    }
+
+    if (scope != null) {
+      builder.scope(scope);
+    }
+    if (resources != null) {
+      builder.resources(resources.toArray(new URI[0]));
+    }
+
+    return builder.build();
+  }
+}
diff --git 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/TokenInfo.java
 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/TokenInfo.java
new file mode 100644
index 000000000..f47cc8b05
--- /dev/null
+++ 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/oauth/TokenInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import java.time.Instant;
+import java.util.Objects;
+
+/** Holds OAuth token information including the access token and expiration 
time. */
+public class TokenInfo {
+  private final String accessToken;
+  private final Instant expiresAt;
+
+  public TokenInfo(String accessToken, Instant expiresAt) {
+    this.accessToken = Objects.requireNonNull(accessToken, "accessToken cannot 
be null");
+    this.expiresAt = Objects.requireNonNull(expiresAt, "expiresAt cannot be 
null");
+  }
+
+  public String getAccessToken() {
+    return accessToken;
+  }
+
+  /**
+   * Checks if the token is expired or will expire within the buffer period.
+   *
+   * @param bufferSeconds seconds before actual expiration to consider token 
expired
+   * @return true if token should be refreshed
+   */
+  public boolean isExpired(int bufferSeconds) {
+    return Instant.now().plusSeconds(bufferSeconds).isAfter(expiresAt);
+  }
+}
diff --git 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
index 76ba964a5..d0ba74dbc 100644
--- 
a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
+++ 
b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.arrow.driver.jdbc.utils;
 
+import java.sql.SQLException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -23,6 +24,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import org.apache.arrow.driver.jdbc.ArrowFlightConnection;
+import org.apache.arrow.driver.jdbc.client.oauth.OAuthConfiguration;
 import org.apache.arrow.flight.CallHeaders;
 import org.apache.arrow.flight.CallOption;
 import org.apache.arrow.flight.FlightCallHeaders;
@@ -31,6 +33,7 @@ import org.apache.arrow.util.Preconditions;
 import org.apache.calcite.avatica.ConnectionConfig;
 import org.apache.calcite.avatica.ConnectionConfigImpl;
 import org.apache.calcite.avatica.ConnectionProperty;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** A {@link ConnectionConfig} for the {@link ArrowFlightConnection}. */
 public final class ArrowFlightConnectionConfigImpl extends 
ConnectionConfigImpl {
@@ -211,6 +214,38 @@ public final class ArrowFlightConnectionConfigImpl extends 
ConnectionConfigImpl
     return headers;
   }
 
+  /**
+   * Returns OAuth configuration if oauth.flow is specified, null otherwise.
+   *
+   * @return the OAuth configuration or null
+   * @throws SQLException if the OAuth configuration is invalid
+   */
+  public @Nullable OAuthConfiguration getOauthConfiguration() throws 
SQLException {
+    String flow = 
ArrowFlightConnectionProperty.OAUTH_FLOW.getString(properties);
+    if (flow == null) {
+      return null;
+    }
+
+    return new OAuthConfiguration.Builder()
+        .flow(flow)
+        
.clientId(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.getString(properties))
+        
.clientSecret(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.getString(properties))
+        
.tokenUri(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.getString(properties))
+        .scope(ArrowFlightConnectionProperty.OAUTH_SCOPE.getString(properties))
+        
.resource(ArrowFlightConnectionProperty.OAUTH_RESOURCE.getString(properties))
+        .subjectToken(
+            
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN.getString(properties))
+        .subjectTokenType(
+            
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN_TYPE.getString(properties))
+        
.actorToken(ArrowFlightConnectionProperty.OAUTH_EXCHANGE_ACTOR_TOKEN.getString(properties))
+        .actorTokenType(
+            
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_ACTOR_TOKEN_TYPE.getString(properties))
+        
.audience(ArrowFlightConnectionProperty.OAUTH_EXCHANGE_AUDIENCE.getString(properties))
+        .requestedTokenType(
+            
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_REQUESTED_TOKEN_TYPE.getString(properties))
+        .build();
+  }
+
   /** Custom {@link ConnectionProperty} for the {@link 
ArrowFlightConnectionConfigImpl}. */
   public enum ArrowFlightConnectionProperty implements ConnectionProperty {
     HOST("host", null, Type.STRING, true),
@@ -232,6 +267,23 @@ public final class ArrowFlightConnectionConfigImpl extends 
ConnectionConfigImpl
     CATALOG("catalog", null, Type.STRING, false),
     CONNECT_TIMEOUT_MILLIS("connectTimeoutMs", 10000, Type.NUMBER, false),
     USE_CLIENT_CACHE("useClientCache", true, Type.BOOLEAN, false),
+
+    // OAuth configuration properties
+    OAUTH_FLOW("oauth.flow", null, Type.STRING, false),
+    OAUTH_CLIENT_ID("oauth.clientId", null, Type.STRING, false),
+    OAUTH_CLIENT_SECRET("oauth.clientSecret", null, Type.STRING, false),
+    OAUTH_TOKEN_URI("oauth.tokenUri", null, Type.STRING, false),
+    OAUTH_SCOPE("oauth.scope", null, Type.STRING, false),
+    OAUTH_RESOURCE("oauth.resource", null, Type.STRING, false),
+
+    // Token exchange specific properties
+    OAUTH_EXCHANGE_SUBJECT_TOKEN("oauth.exchange.subjectToken", null, 
Type.STRING, false),
+    OAUTH_EXCHANGE_SUBJECT_TOKEN_TYPE("oauth.exchange.subjectTokenType", null, 
Type.STRING, false),
+    OAUTH_EXCHANGE_ACTOR_TOKEN("oauth.exchange.actorToken", null, Type.STRING, 
false),
+    OAUTH_EXCHANGE_ACTOR_TOKEN_TYPE("oauth.exchange.actorTokenType", null, 
Type.STRING, false),
+    OAUTH_EXCHANGE_AUDIENCE("oauth.exchange.aud", null, Type.STRING, false),
+    OAUTH_EXCHANGE_REQUESTED_TOKEN_TYPE(
+        "oauth.exchange.requestedTokenType", null, Type.STRING, false),
     ;
 
     private final String camelName;
diff --git 
a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/OAuthIntegrationTest.java
 
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/OAuthIntegrationTest.java
new file mode 100644
index 000000000..5e782db03
--- /dev/null
+++ 
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/OAuthIntegrationTest.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import mockwebserver3.MockResponse;
+import mockwebserver3.MockWebServer;
+import mockwebserver3.RecordedRequest;
+import mockwebserver3.junit5.StartStop;
+import org.apache.arrow.driver.jdbc.authentication.TokenAuthentication;
+import 
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty;
+import org.apache.arrow.driver.jdbc.utils.MockFlightSqlProducer;
+import org.apache.arrow.flight.sql.FlightSqlProducer.Schemas;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetCatalogs;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetDbSchemas;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+/**
+ * Integration tests for OAuth authentication flows in the JDBC driver.
+ *
+ * <p>These tests verify that OAuth tokens obtained from an OAuth server are 
correctly used in
+ * Flight SQL requests.
+ */
+public class OAuthIntegrationTest {
+
+  private static final String VALID_ACCESS_TOKEN = 
"valid-oauth-access-token-12345";
+  private static final String CLIENT_ID = "test-client-id";
+  private static final String CLIENT_SECRET = "test-client-secret";
+  private static final String SUBJECT_TOKEN = "original-subject-token";
+  private static final String SUBJECT_TOKEN_TYPE = 
"urn:ietf:params:oauth:token-type:jwt";
+  private static final String TEST_SCOPE = "dremio.all";
+
+  private static final MockFlightSqlProducer FLIGHT_SQL_PRODUCER = new 
MockFlightSqlProducer();
+
+  @RegisterExtension public static FlightServerTestExtension 
FLIGHT_SERVER_TEST_EXTENSION;
+
+  static {
+    FLIGHT_SERVER_TEST_EXTENSION =
+        new FlightServerTestExtension.Builder()
+            .authentication(new 
TokenAuthentication.Builder().token(VALID_ACCESS_TOKEN).build())
+            .producer(FLIGHT_SQL_PRODUCER)
+            .build();
+  }
+
+  @StartStop private final MockWebServer oauthServer = new MockWebServer();
+  private URI tokenEndpoint;
+
+  @BeforeAll
+  public static void setUpClass() {
+    // Register a simple catalog query handler
+    FLIGHT_SQL_PRODUCER.addCatalogQuery(
+        CommandGetCatalogs.getDefaultInstance(),
+        listener -> {
+          try (BufferAllocator allocator = new RootAllocator();
+              VectorSchemaRoot root =
+                  VectorSchemaRoot.create(Schemas.GET_CATALOGS_SCHEMA, 
allocator)) {
+            root.setRowCount(0);
+            listener.start(root);
+            listener.putNext();
+          } catch (Throwable t) {
+            listener.error(t);
+          } finally {
+            listener.completed();
+          }
+        });
+
+    // Register a simple schema query handler for getSchemas()
+    FLIGHT_SQL_PRODUCER.addCatalogQuery(
+        CommandGetDbSchemas.getDefaultInstance(),
+        listener -> {
+          try (BufferAllocator allocator = new RootAllocator();
+              VectorSchemaRoot root =
+                  VectorSchemaRoot.create(Schemas.GET_SCHEMAS_SCHEMA, 
allocator)) {
+            root.setRowCount(0);
+            listener.start(root);
+            listener.putNext();
+          } catch (Throwable t) {
+            listener.error(t);
+          } finally {
+            listener.completed();
+          }
+        });
+  }
+
+  @AfterAll
+  public static void tearDownClass() {
+    AutoCloseables.closeNoChecked(FLIGHT_SQL_PRODUCER);
+  }
+
+  @BeforeEach
+  public void setUp() {
+    tokenEndpoint = oauthServer.url("/oauth/token").uri();
+  }
+
+  @AfterEach
+  public void tearDown() {
+    oauthServer.close();
+  }
+
+  // Helper methods for mock OAuth responses
+
+  private void enqueueSuccessfulTokenResponse() {
+    enqueueSuccessfulTokenResponse(VALID_ACCESS_TOKEN, 3600);
+  }
+
+  private void enqueueSuccessfulTokenResponse(String token, int expiresIn) {
+    String body =
+        String.format(
+            
"{\"access_token\":\"%s\",\"token_type\":\"Bearer\",\"expires_in\":%d}",
+            token, expiresIn);
+    oauthServer.enqueue(
+        new MockResponse.Builder()
+            .code(200)
+            .setHeader("Content-Type", "application/json")
+            .body(body)
+            .build());
+  }
+
+  private void enqueueErrorResponse(String error, String description) {
+    String body =
+        String.format("{\"error\":\"%s\",\"error_description\":\"%s\"}", 
error, description);
+    oauthServer.enqueue(
+        new MockResponse.Builder()
+            .code(400)
+            .setHeader("Content-Type", "application/json")
+            .body(body)
+            .build());
+  }
+
+  private Properties createBaseProperties() {
+    Properties props = new Properties();
+    props.put(ArrowFlightConnectionProperty.HOST.camelName(), "localhost");
+    props.put(
+        ArrowFlightConnectionProperty.PORT.camelName(), 
FLIGHT_SERVER_TEST_EXTENSION.getPort());
+    props.put(ArrowFlightConnectionProperty.USE_ENCRYPTION.camelName(), false);
+    return props;
+  }
+
+  private String getJdbcUrl() {
+    return String.format(
+        "jdbc:arrow-flight-sql://localhost:%d", 
FLIGHT_SERVER_TEST_EXTENSION.getPort());
+  }
+
+  // ==================== Client Credentials Flow Tests ====================
+
+  @Test
+  public void testClientCredentialsFlowSuccess() throws Exception {
+    enqueueSuccessfulTokenResponse();
+
+    Properties props = createBaseProperties();
+    props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(), 
"client_credentials");
+    props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(), 
tokenEndpoint.toString());
+    props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.camelName(), 
CLIENT_ID);
+    props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.camelName(), 
CLIENT_SECRET);
+    props.put(ArrowFlightConnectionProperty.OAUTH_SCOPE.camelName(), 
TEST_SCOPE);
+
+    try (Connection conn = DriverManager.getConnection(getJdbcUrl(), props)) {
+      assertFalse(conn.isClosed());
+      // Trigger a Flight call to force OAuth token retrieval
+      conn.getMetaData().getCatalogs().close();
+    }
+
+    // Verify OAuth request was made
+    RecordedRequest oauthRequest = oauthServer.takeRequest(5, 
TimeUnit.SECONDS);
+    assertNotNull(oauthRequest, "OAuth request should have been made");
+    assertEquals("POST", oauthRequest.getMethod());
+    String body = oauthRequest.getBody().utf8();
+    assertTrue(body.contains("grant_type=client_credentials"));
+    assertTrue(body.contains("scope=" + TEST_SCOPE));
+  }
+
+  @Test
+  public void testClientCredentialsFlowWithUrlParameters() throws Exception {
+    enqueueSuccessfulTokenResponse();
+
+    String url =
+        String.format(
+            "jdbc:arrow-flight-sql://localhost:%d?useEncryption=false"
+                + "&oauth.flow=client_credentials"
+                + "&oauth.tokenUri=%s"
+                + "&oauth.clientId=%s"
+                + "&oauth.clientSecret=%s",
+            FLIGHT_SERVER_TEST_EXTENSION.getPort(),
+            tokenEndpoint.toString(),
+            CLIENT_ID,
+            CLIENT_SECRET);
+
+    try (Connection conn = DriverManager.getConnection(url)) {
+      conn.getMetaData().getCatalogs().close();
+    }
+
+    assertEquals(1, oauthServer.getRequestCount());
+  }
+
+  @Test
+  public void testClientCredentialsFlowInvalidCredentials() throws Exception {
+    enqueueErrorResponse("invalid_client", "Client authentication failed");
+
+    Properties props = createBaseProperties();
+    props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(), 
"client_credentials");
+    props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(), 
tokenEndpoint.toString());
+    props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.camelName(), 
"wrong-client");
+    props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.camelName(), 
"wrong-secret");
+
+    Exception ex =
+        assertThrows(
+            Exception.class,
+            () -> {
+              try (Connection conn = DriverManager.getConnection(getJdbcUrl(), 
props)) {
+                conn.getMetaData().getCatalogs().close();
+              }
+            });
+    // Verify the error message contains the OAuth error somewhere in the 
exception chain
+    assertTrue(
+        containsInExceptionChain(ex, "invalid_client"),
+        "Exception chain should contain 'invalid_client'");
+  }
+
+  private boolean containsInExceptionChain(Throwable t, String message) {
+    while (t != null) {
+      if (t.getMessage() != null && t.getMessage().contains(message)) {
+        return true;
+      }
+      t = t.getCause();
+    }
+    return false;
+  }
+
+  // ==================== Token Exchange Flow Tests ====================
+
+  @Test
+  public void testTokenExchangeFlowMinimalParameters() throws Exception {
+    enqueueSuccessfulTokenResponse();
+
+    Properties props = createBaseProperties();
+    props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(), 
"token_exchange");
+    props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(), 
tokenEndpoint.toString());
+    props.put(
+        
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN.camelName(), 
SUBJECT_TOKEN);
+    props.put(
+        
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN_TYPE.camelName(),
+        SUBJECT_TOKEN_TYPE);
+
+    try (Connection conn = DriverManager.getConnection(getJdbcUrl(), props)) {
+      conn.getMetaData().getCatalogs().close();
+    }
+
+    RecordedRequest oauthRequest = oauthServer.takeRequest(5, 
TimeUnit.SECONDS);
+    assertNotNull(oauthRequest, "OAuth request should have been made");
+    String body = oauthRequest.getBody().utf8();
+    assertTrue(
+        
body.contains("grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Atoken-exchange"),
+        "Should contain token exchange grant type");
+    assertTrue(body.contains("subject_token=" + SUBJECT_TOKEN));
+  }
+
+  @Test
+  public void testTokenExchangeFlowWithAllParameters() throws Exception {
+    enqueueSuccessfulTokenResponse();
+
+    String actorToken = "actor-token-value";
+    String actorTokenType = "urn:ietf:params:oauth:token-type:access_token";
+    String audience = "https://api.example.com";;
+    String resource = "https://api.example.com/resource";;
+    String requestedTokenType = 
"urn:ietf:params:oauth:token-type:access_token";
+
+    Properties props = createBaseProperties();
+    props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(), 
"token_exchange");
+    props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(), 
tokenEndpoint.toString());
+    props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.camelName(), 
CLIENT_ID);
+    props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.camelName(), 
CLIENT_SECRET);
+    props.put(ArrowFlightConnectionProperty.OAUTH_SCOPE.camelName(), 
TEST_SCOPE);
+    props.put(
+        
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN.camelName(), 
SUBJECT_TOKEN);
+    props.put(
+        
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN_TYPE.camelName(),
+        SUBJECT_TOKEN_TYPE);
+    
props.put(ArrowFlightConnectionProperty.OAUTH_EXCHANGE_ACTOR_TOKEN.camelName(), 
actorToken);
+    props.put(
+        
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_ACTOR_TOKEN_TYPE.camelName(), 
actorTokenType);
+    
props.put(ArrowFlightConnectionProperty.OAUTH_EXCHANGE_AUDIENCE.camelName(), 
audience);
+    props.put(ArrowFlightConnectionProperty.OAUTH_RESOURCE.camelName(), 
resource);
+    props.put(
+        
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_REQUESTED_TOKEN_TYPE.camelName(),
+        requestedTokenType);
+
+    try (Connection conn = DriverManager.getConnection(getJdbcUrl(), props)) {
+      conn.getMetaData().getCatalogs().close();
+    }
+
+    RecordedRequest oauthRequest = oauthServer.takeRequest(5, 
TimeUnit.SECONDS);
+    assertNotNull(oauthRequest, "OAuth request should have been made");
+    String body = oauthRequest.getBody().utf8();
+    assertTrue(body.contains("subject_token=" + SUBJECT_TOKEN));
+    assertTrue(body.contains("actor_token=" + actorToken));
+  }
+
+  @Test
+  public void testTokenExchangeFlowWithClientAuthentication() throws Exception 
{
+    enqueueSuccessfulTokenResponse();
+
+    Properties props = createBaseProperties();
+    props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(), 
"token_exchange");
+    props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(), 
tokenEndpoint.toString());
+    props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.camelName(), 
CLIENT_ID);
+    props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.camelName(), 
CLIENT_SECRET);
+    props.put(
+        
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN.camelName(), 
SUBJECT_TOKEN);
+    props.put(
+        
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN_TYPE.camelName(),
+        SUBJECT_TOKEN_TYPE);
+
+    try (Connection conn = DriverManager.getConnection(getJdbcUrl(), props)) {
+      conn.getMetaData().getCatalogs().close();
+    }
+
+    RecordedRequest oauthRequest = oauthServer.takeRequest(5, 
TimeUnit.SECONDS);
+    assertNotNull(oauthRequest, "OAuth request should have been made");
+    String authHeader = oauthRequest.getHeaders().get("Authorization");
+    assertNotNull(authHeader, "Should have Basic auth header for client 
authentication");
+    assertTrue(authHeader.startsWith("Basic "));
+  }
+
+  // ==================== Token Caching Tests ====================
+
+  @Test
+  public void testTokenCachingAcrossMultipleOperations() throws Exception {
+    enqueueSuccessfulTokenResponse();
+
+    Properties props = createBaseProperties();
+    props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(), 
"client_credentials");
+    props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(), 
tokenEndpoint.toString());
+    props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.camelName(), 
CLIENT_ID);
+    props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.camelName(), 
CLIENT_SECRET);
+
+    try (Connection conn = DriverManager.getConnection(getJdbcUrl(), props)) {
+      // Execute multiple operations
+      conn.isValid(5);
+      conn.getMetaData().getCatalogs().close();
+      conn.getMetaData().getSchemas().close();
+    }
+
+    // Should only have made one OAuth request due to caching
+    assertEquals(1, oauthServer.getRequestCount());
+  }
+
+  @Test
+  public void testTokenRefreshAfterExpiration() throws Exception {
+    enqueueSuccessfulTokenResponse(VALID_ACCESS_TOKEN, 1);
+    enqueueSuccessfulTokenResponse(VALID_ACCESS_TOKEN, 3600);
+
+    Properties props = createBaseProperties();
+    props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(), 
"token_exchange");
+    props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(), 
tokenEndpoint.toString());
+    props.put(
+        
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN.camelName(), 
SUBJECT_TOKEN);
+    props.put(
+        
ArrowFlightConnectionProperty.OAUTH_EXCHANGE_SUBJECT_TOKEN_TYPE.camelName(),
+        SUBJECT_TOKEN_TYPE);
+
+    try (Connection conn = DriverManager.getConnection(getJdbcUrl(), props)) {
+      // First operation triggers initial token fetch
+      conn.getMetaData().getCatalogs().close();
+
+      // Token with 1s expiry is immediately considered expired (due to 30s 
buffer)
+      // so the next operation should trigger a refresh
+      conn.getMetaData().getCatalogs().close();
+    }
+
+    // Should have made exactly 2 OAuth requests: initial + refresh
+    assertEquals(2, oauthServer.getRequestCount());
+  }
+
+  // ==================== Error Handling Tests ====================
+
+  @Test
+  public void testMissingRequiredParametersClientCredentials() {
+    Properties props = createBaseProperties();
+    props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(), 
"client_credentials");
+    props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(), 
tokenEndpoint.toString());
+    // Missing client_id and client_secret
+
+    assertThrows(SQLException.class, () -> 
DriverManager.getConnection(getJdbcUrl(), props));
+  }
+
+  @Test
+  public void testMissingRequiredParametersTokenExchange() {
+    Properties props = createBaseProperties();
+    props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(), 
"token_exchange");
+    props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(), 
tokenEndpoint.toString());
+    // Missing subject_token and subject_token_type
+
+    assertThrows(SQLException.class, () -> 
DriverManager.getConnection(getJdbcUrl(), props));
+  }
+
+  @Test
+  public void testInvalidOAuthFlow() {
+    Properties props = createBaseProperties();
+    props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(), 
"invalid_flow");
+    props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(), 
tokenEndpoint.toString());
+
+    assertThrows(SQLException.class, () -> 
DriverManager.getConnection(getJdbcUrl(), props));
+  }
+
+  @Test
+  public void testMalformedTokenEndpoint() {
+    Properties props = createBaseProperties();
+    props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(), 
"client_credentials");
+    props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(), 
"not-a-valid-uri://");
+    props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.camelName(), 
CLIENT_ID);
+    props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.camelName(), 
CLIENT_SECRET);
+
+    assertThrows(SQLException.class, () -> 
DriverManager.getConnection(getJdbcUrl(), props));
+  }
+
+  // ==================== Authorization Header Verification 
====================
+
+  @Test
+  public void testOAuthTokenSentAsBearer() throws Exception {
+    enqueueSuccessfulTokenResponse();
+
+    Properties props = createBaseProperties();
+    props.put(ArrowFlightConnectionProperty.OAUTH_FLOW.camelName(), 
"client_credentials");
+    props.put(ArrowFlightConnectionProperty.OAUTH_TOKEN_URI.camelName(), 
tokenEndpoint.toString());
+    props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_ID.camelName(), 
CLIENT_ID);
+    props.put(ArrowFlightConnectionProperty.OAUTH_CLIENT_SECRET.camelName(), 
CLIENT_SECRET);
+
+    try (Connection conn = DriverManager.getConnection(getJdbcUrl(), props)) {
+      conn.getMetaData().getCatalogs().close();
+    }
+
+    // Verify the Flight server received the bearer token
+    String authHeader =
+        FLIGHT_SERVER_TEST_EXTENSION
+            .getInterceptorFactory()
+            .getHeader(org.apache.arrow.flight.FlightMethod.GET_FLIGHT_INFO, 
"authorization");
+    assertNotNull(authHeader, "Authorization header should be present in 
Flight requests");
+    assertEquals("Bearer " + VALID_ACCESS_TOKEN, authHeader);
+  }
+}
diff --git 
a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthConfigurationTest.java
 
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthConfigurationTest.java
new file mode 100644
index 000000000..c258a7c65
--- /dev/null
+++ 
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthConfigurationTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import com.nimbusds.oauth2.sdk.Scope;
+import java.net.URI;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.Named;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/** Tests for {@link OAuthConfiguration}. */
+public class OAuthConfigurationTest {
+
+  private static final String TOKEN_URI = "https://auth.example.com/token";;
+  private static final String CLIENT_ID = "test-client-id";
+  private static final String CLIENT_SECRET = "test-client-secret";
+  private static final String SCOPE = "read write";
+  private static final String SUBJECT_TOKEN = "subject-token-value";
+  public static final String RESOURCE = "https://api.example.com/resource";;
+
+  @FunctionalInterface
+  interface BuilderConfigurer {
+    void configure(OAuthConfiguration.Builder builder) throws SQLException;
+  }
+
+  static Stream<Arguments> createFlowCases() {
+    return Stream.of(
+        Arguments.of(
+            Named.of(
+                "string flow", (BuilderConfigurer) builder -> 
builder.flow("client_credentials"))),
+        Arguments.of(
+            Named.of(
+                "uppercase string flow",
+                (BuilderConfigurer) builder -> 
builder.flow("CLIENT_CREDENTIALS"))));
+  }
+
+  @ParameterizedTest
+  @MethodSource("createFlowCases")
+  public void testCreateFlowConfiguration(BuilderConfigurer flowConfigurer) 
throws SQLException {
+    OAuthConfiguration.Builder builder = new OAuthConfiguration.Builder();
+    flowConfigurer.configure(builder);
+    OAuthConfiguration config =
+        
builder.tokenUri(TOKEN_URI).clientId(CLIENT_ID).clientSecret(CLIENT_SECRET).build();
+
+    // Verify configuration creates correct provider type
+    OAuthTokenProvider provider = config.createTokenProvider();
+    assertInstanceOf(ClientCredentialsTokenProvider.class, provider);
+  }
+
+  @Test
+  public void testCreateClientCredentialsTokenProvider() throws SQLException {
+    OAuthConfiguration config =
+        new OAuthConfiguration.Builder()
+            .flow("client_credentials")
+            .tokenUri(TOKEN_URI)
+            .clientId(CLIENT_ID)
+            .clientSecret(CLIENT_SECRET)
+            .scope(SCOPE)
+            .build();
+
+    OAuthTokenProvider provider = config.createTokenProvider();
+
+    assertNotNull(provider);
+    assertInstanceOf(ClientCredentialsTokenProvider.class, provider);
+
+    ClientCredentialsTokenProvider ccProvider = 
(ClientCredentialsTokenProvider) provider;
+    assertEquals(URI.create(TOKEN_URI), ccProvider.tokenUri);
+    assertEquals(CLIENT_ID, ccProvider.clientAuth.getClientID().getValue());
+    assertEquals(Scope.parse(SCOPE), ccProvider.scope);
+  }
+
+  @Test
+  public void testCreateTokenExchangeTokenProviderWithAllOptions() throws 
SQLException {
+    String subjectTokenType = "urn:ietf:params:oauth:token-type:access_token";
+    String actorToken = "actor-token-value";
+    String actorTokenType = "urn:ietf:params:oauth:token-type:jwt";
+    String audience = "https://api.example.com";;
+    String requestedTokenType = 
"urn:ietf:params:oauth:token-type:access_token";
+
+    OAuthConfiguration config =
+        new OAuthConfiguration.Builder()
+            .flow("token_exchange")
+            .tokenUri(TOKEN_URI)
+            .scope(SCOPE)
+            .clientId(CLIENT_ID)
+            .clientSecret(CLIENT_SECRET)
+            .resource(RESOURCE)
+            .subjectToken(SUBJECT_TOKEN)
+            .subjectTokenType(subjectTokenType)
+            .actorToken(actorToken)
+            .actorTokenType(actorTokenType)
+            .audience(audience)
+            .requestedTokenType(requestedTokenType)
+            .build();
+
+    OAuthTokenProvider provider = config.createTokenProvider();
+
+    assertNotNull(provider);
+    assertInstanceOf(TokenExchangeTokenProvider.class, provider);
+
+    TokenExchangeTokenProvider teProvider = (TokenExchangeTokenProvider) 
provider;
+    assertEquals(URI.create(TOKEN_URI), teProvider.tokenUri);
+    assertNotNull(teProvider.grant);
+    assertEquals(SUBJECT_TOKEN, teProvider.grant.getSubjectToken().getValue());
+    assertEquals(subjectTokenType, 
teProvider.grant.getSubjectTokenType().getURI().toString());
+    assertEquals(actorToken, teProvider.grant.getActorToken().getValue());
+    assertEquals(actorTokenType, 
teProvider.grant.getActorTokenType().getURI().toString());
+    assertNotNull(teProvider.grant.getAudience());
+    assertEquals(1, teProvider.grant.getAudience().size());
+    assertEquals(audience, teProvider.grant.getAudience().get(0).getValue());
+    assertEquals(requestedTokenType, 
teProvider.grant.getRequestedTokenType().getURI().toString());
+    assertEquals(Scope.parse(SCOPE), teProvider.scope);
+    assertEquals(Collections.singletonList(URI.create(RESOURCE)), 
teProvider.resources);
+
+    assertEquals(CLIENT_ID, teProvider.clientAuth.getClientID().getValue());
+  }
+
+  static Stream<Arguments> generalValidationErrorCases() {
+    return Stream.of(
+        Arguments.of(
+            Named.of(
+                "null flow",
+                (BuilderConfigurer) builder -> builder.flow((String) 
null).tokenUri(TOKEN_URI)),
+            "OAuth flow cannot be null or empty"),
+        Arguments.of(
+            Named.of(
+                "empty flow", (BuilderConfigurer) builder -> 
builder.flow("").tokenUri(TOKEN_URI)),
+            "OAuth flow cannot be null or empty"),
+        Arguments.of(
+            Named.of(
+                "invalid flow",
+                (BuilderConfigurer) builder -> 
builder.flow("invalid_flow").tokenUri(TOKEN_URI)),
+            "Unsupported OAuth flow: invalid_flow"),
+        Arguments.of(
+            Named.of(
+                "null tokenUri",
+                (BuilderConfigurer)
+                    builder ->
+                        builder
+                            .flow("client_credentials")
+                            .tokenUri((String) null)
+                            .clientId(CLIENT_ID)
+                            .clientSecret(CLIENT_SECRET)),
+            "Token URI cannot be null or empty"),
+        Arguments.of(
+            Named.of(
+                "empty tokenUri",
+                (BuilderConfigurer)
+                    builder ->
+                        builder
+                            .flow("client_credentials")
+                            .tokenUri("")
+                            .clientId(CLIENT_ID)
+                            .clientSecret(CLIENT_SECRET)),
+            "Token URI cannot be null or empty"),
+        Arguments.of(
+            Named.of(
+                "invalid tokenUri",
+                (BuilderConfigurer)
+                    builder ->
+                        builder
+                            .flow("client_credentials")
+                            .tokenUri("not a valid uri ://")
+                            .clientId(CLIENT_ID)
+                            .clientSecret(CLIENT_SECRET)),
+            null),
+        Arguments.of(
+            Named.of(
+                "invalid tokenUri",
+                (BuilderConfigurer)
+                    builder ->
+                        
builder.flow("client_credentials").tokenUri(TOKEN_URI).clientId(CLIENT_ID)),
+            // null means verify exception has message and cause
+            "clientSecret is required for client_credentials flow"));
+  }
+
+  @ParameterizedTest
+  @MethodSource("generalValidationErrorCases")
+  public void testGeneralValidationErrors(BuilderConfigurer configurer, String 
expectedMessage) {
+    SQLException exception =
+        assertThrows(
+            SQLException.class,
+            () -> {
+              OAuthConfiguration.Builder builder = new 
OAuthConfiguration.Builder();
+              configurer.configure(builder);
+              builder.build();
+            });
+
+    if (expectedMessage != null) {
+      assertEquals(expectedMessage, exception.getMessage());
+    } else {
+      assertNotNull(exception.getMessage());
+      assertNotNull(exception.getCause());
+    }
+  }
+
+  static Stream<Arguments> flowSpecificValidationErrorCases() {
+    return Stream.of(
+        // client_credentials flow validation
+        Arguments.of(
+            Named.of(
+                "client_credentials: missing clientId",
+                (BuilderConfigurer)
+                    builder ->
+                        builder
+                            .flow("client_credentials")
+                            .tokenUri(TOKEN_URI)
+                            .clientSecret(CLIENT_SECRET)),
+            "clientId is required for client_credentials flow"),
+        Arguments.of(
+            Named.of(
+                "client_credentials: missing clientSecret",
+                (BuilderConfigurer)
+                    builder ->
+                        
builder.flow("client_credentials").tokenUri(TOKEN_URI).clientId(CLIENT_ID)),
+            "clientSecret is required for client_credentials flow"),
+        // token_exchange flow validation
+        Arguments.of(
+            Named.of(
+                "token_exchange: missing subjectToken",
+                (BuilderConfigurer) builder -> 
builder.flow("token_exchange").tokenUri(TOKEN_URI)),
+            "subjectToken is required for token_exchange flow"),
+        Arguments.of(
+            Named.of(
+                "token_exchange: empty subjectToken",
+                (BuilderConfigurer)
+                    builder ->
+                        builder
+                            .flow("token_exchange")
+                            .tokenUri(TOKEN_URI)
+                            .subjectToken("")
+                            
.subjectTokenType("urn:ietf:params:oauth:token-type:access_token")),
+            "subjectToken is required for token_exchange flow"),
+        Arguments.of(
+            Named.of(
+                "token_exchange: missing subjectTokenType",
+                (BuilderConfigurer)
+                    builder ->
+                        builder
+                            .flow("token_exchange")
+                            .tokenUri(TOKEN_URI)
+                            .subjectToken(SUBJECT_TOKEN)),
+            "subjectTokenType is required for token_exchange flow"),
+        Arguments.of(
+            Named.of(
+                "token_exchange: empty subjectTokenType",
+                (BuilderConfigurer)
+                    builder ->
+                        builder
+                            .flow("token_exchange")
+                            .tokenUri(TOKEN_URI)
+                            .subjectToken(SUBJECT_TOKEN)
+                            .subjectTokenType("")),
+            "subjectTokenType is required for token_exchange flow"));
+  }
+
+  @ParameterizedTest
+  @MethodSource("flowSpecificValidationErrorCases")
+  public void testFlowSpecificValidationErrors(
+      BuilderConfigurer configurer, String expectedMessage) {
+    SQLException exception =
+        assertThrows(
+            SQLException.class,
+            () -> {
+              OAuthConfiguration.Builder builder = new 
OAuthConfiguration.Builder();
+              configurer.configure(builder);
+              builder.build();
+            });
+
+    assertEquals(expectedMessage, exception.getMessage());
+  }
+}
diff --git 
a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthCredentialWriterTest.java
 
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthCredentialWriterTest.java
new file mode 100644
index 000000000..1a33f7f0a
--- /dev/null
+++ 
b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/oauth/OAuthCredentialWriterTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc.client.oauth;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.SQLException;
+import org.apache.arrow.flight.CallHeaders;
+import org.apache.arrow.flight.FlightCallHeaders;
+import org.apache.arrow.flight.auth2.Auth2Constants;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/** Tests for {@link OAuthCredentialWriter}. */
+@ExtendWith(MockitoExtension.class)
+public class OAuthCredentialWriterTest {
+
+  @Mock private OAuthTokenProvider mockTokenProvider;
+
+  @Test
+  public void testConstructorRejectsNullTokenProvider() {
+    assertThrows(NullPointerException.class, () -> new 
OAuthCredentialWriter(null));
+  }
+
+  @Test
+  public void testAcceptWritesBearerTokenToHeaders() throws SQLException {
+    String testToken = "test-access-token-12345";
+    when(mockTokenProvider.getValidToken()).thenReturn(testToken);
+
+    OAuthCredentialWriter writer = new 
OAuthCredentialWriter(mockTokenProvider);
+    CallHeaders headers = new FlightCallHeaders();
+
+    writer.accept(headers);
+
+    verify(mockTokenProvider).getValidToken();
+    assertEquals(
+        Auth2Constants.BEARER_PREFIX + testToken, 
headers.get(Auth2Constants.AUTHORIZATION_HEADER));
+  }
+
+  @Test
+  public void testAcceptThrowsOAuthTokenExceptionOnSQLException() throws 
SQLException {
+    SQLException sqlException = new SQLException("Token fetch failed");
+    when(mockTokenProvider.getValidToken()).thenThrow(sqlException);
+
+    OAuthCredentialWriter writer = new 
OAuthCredentialWriter(mockTokenProvider);
+    CallHeaders headers = new FlightCallHeaders();
+
+    OAuthTokenException exception =
+        assertThrows(OAuthTokenException.class, () -> writer.accept(headers));
+
+    assertEquals("Failed to obtain OAuth token", exception.getMessage());
+    assertEquals(sqlException, exception.getCause());
+  }
+
+  @Test
+  public void testAcceptCallsTokenProviderEachTime() throws SQLException {
+    when(mockTokenProvider.getValidToken())
+        .thenReturn("token1")
+        .thenReturn("token2")
+        .thenReturn("token3");
+
+    OAuthCredentialWriter writer = new 
OAuthCredentialWriter(mockTokenProvider);
+
+    CallHeaders headers1 = new FlightCallHeaders();
+    writer.accept(headers1);
+    assertEquals("Bearer token1", 
headers1.get(Auth2Constants.AUTHORIZATION_HEADER));
+
+    CallHeaders headers2 = new FlightCallHeaders();
+    writer.accept(headers2);
+    assertEquals("Bearer token2", 
headers2.get(Auth2Constants.AUTHORIZATION_HEADER));
+
+    CallHeaders headers3 = new FlightCallHeaders();
+    writer.accept(headers3);
+    assertEquals("Bearer token3", 
headers3.get(Auth2Constants.AUTHORIZATION_HEADER));
+  }
+}
diff --git a/flight/flight-sql-jdbc-driver/src/shade/LICENSE.txt 
b/flight/flight-sql-jdbc-driver/src/shade/LICENSE.txt
index 8bc43cbe0..8476bd999 100644
--- a/flight/flight-sql-jdbc-driver/src/shade/LICENSE.txt
+++ b/flight/flight-sql-jdbc-driver/src/shade/LICENSE.txt
@@ -345,6 +345,14 @@ License: https://www.apache.org/licenses/LICENSE-2.0
 
 
--------------------------------------------------------------------------------
 
+This binary artifact contains Nimbus OAuth 2.0 SDK with OpenID Connect 
extensions 11.20.1.
+
+Copyright: Copyright 2012-2024 Connect2id Ltd.
+Home page: https://connect2id.com/products/nimbus-oauth-openid-connect-sdk
+License: https://www.apache.org/licenses/LICENSE-2.0
+
+--------------------------------------------------------------------------------
+
 This binary artifact contains Bouncycastle 1.80.
 
 Copyright: Copyright (c) 2000-2024 The Legion of the Bouncy Castle Inc. 
(https://www.bouncycastle.org).
diff --git 
a/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ITDriverJarValidation.java
 
b/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ITDriverJarValidation.java
index c1bd111fb..145744ad3 100644
--- 
a/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ITDriverJarValidation.java
+++ 
b/flight/flight-sql-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ITDriverJarValidation.java
@@ -70,6 +70,10 @@ public class ITDriverJarValidation {
           "LICENSE.txt",
           "NOTICE.txt",
           "arrow-git.properties",
+          "iso3166_1alpha2-codes.properties",
+          "iso3166_1alpha3-codes.properties",
+          "iso3166_1alpha-2-3-map.properties",
+          "iso3166_3-codes.properties",
           "properties/flight.properties",
           "META-INF/io.netty.versions.properties",
           "META-INF/MANIFEST.MF",

Reply via email to