This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 944752369 feat(go/adbc/driver/bigquery): support service account 
impersonation (#3174)
944752369 is described below

commit 944752369ab7492dcb69479a4c473367ea19bed5
Author: Yu Ishikawa <[email protected]>
AuthorDate: Sat Jul 26 17:02:40 2025 +0900

    feat(go/adbc/driver/bigquery): support service account impersonation (#3174)
    
    Closes https://github.com/apache/arrow-adbc/issues/3168
    
    ---------
    
    Signed-off-by: Yu Ishikawa <[email protected]>
---
 go/adbc/driver/bigquery/bigquery_database.go |  76 ++++++++++---
 go/adbc/driver/bigquery/connection.go        | 163 ++++++++++++++++++++-------
 go/adbc/driver/bigquery/driver.go            |  32 ++++++
 go/adbc/driver/bigquery/driver_test.go       |  38 +++++++
 4 files changed, 252 insertions(+), 57 deletions(-)

diff --git a/go/adbc/driver/bigquery/bigquery_database.go 
b/go/adbc/driver/bigquery/bigquery_database.go
index cd6fff8a1..caaa7dbe3 100644
--- a/go/adbc/driver/bigquery/bigquery_database.go
+++ b/go/adbc/driver/bigquery/bigquery_database.go
@@ -20,6 +20,8 @@ package bigquery
 import (
        "context"
        "fmt"
+       "strings"
+       "time"
 
        "github.com/apache/arrow-adbc/go/adbc"
        "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
@@ -33,6 +35,12 @@ type databaseImpl struct {
        clientID     string
        clientSecret string
        refreshToken string
+
+       impersonateTargetPrincipal string
+       impersonateDelegates       []string
+       impersonateScopes          []string
+       impersonateLifetime        time.Duration
+
        // projectID is the catalog
        projectID string
        // datasetID is the schema
@@ -42,17 +50,21 @@ type databaseImpl struct {
 
 func (d *databaseImpl) Open(ctx context.Context) (adbc.Connection, error) {
        conn := &connectionImpl{
-               ConnectionImplBase:     
driverbase.NewConnectionImplBase(&d.DatabaseImplBase),
-               authType:               d.authType,
-               credentials:            d.credentials,
-               clientID:               d.clientID,
-               clientSecret:           d.clientSecret,
-               refreshToken:           d.refreshToken,
-               tableID:                d.tableID,
-               catalog:                d.projectID,
-               dbSchema:               d.datasetID,
-               resultRecordBufferSize: defaultQueryResultBufferSize,
-               prefetchConcurrency:    defaultQueryPrefetchConcurrency,
+               ConnectionImplBase:         
driverbase.NewConnectionImplBase(&d.DatabaseImplBase),
+               authType:                   d.authType,
+               credentials:                d.credentials,
+               clientID:                   d.clientID,
+               clientSecret:               d.clientSecret,
+               refreshToken:               d.refreshToken,
+               impersonateTargetPrincipal: d.impersonateTargetPrincipal,
+               impersonateDelegates:       d.impersonateDelegates,
+               impersonateScopes:          d.impersonateScopes,
+               impersonateLifetime:        d.impersonateLifetime,
+               tableID:                    d.tableID,
+               catalog:                    d.projectID,
+               dbSchema:                   d.datasetID,
+               resultRecordBufferSize:     defaultQueryResultBufferSize,
+               prefetchConcurrency:        defaultQueryPrefetchConcurrency,
        }
 
        err := conn.newClient(ctx)
@@ -88,6 +100,15 @@ func (d *databaseImpl) GetOption(key string) (string, 
error) {
                return d.datasetID, nil
        case OptionStringTableID:
                return d.tableID, nil
+       case OptionStringImpersonateLifetime:
+               if d.impersonateLifetime == 0 {
+                       // If no lifetime is set but impersonation is enabled, 
return the default
+                       if d.hasImpersonationOptions() {
+                               return (3600 * time.Second).String(), nil
+                       }
+                       return "", nil
+               }
+               return d.impersonateLifetime.String(), nil
        default:
                return d.DatabaseImplBase.GetOption(key)
        }
@@ -103,17 +124,21 @@ func (d *databaseImpl) SetOptions(options 
map[string]string) error {
        return nil
 }
 
+func (d *databaseImpl) hasImpersonationOptions() bool {
+       return d.impersonateTargetPrincipal != "" ||
+               len(d.impersonateDelegates) > 0 ||
+               len(d.impersonateScopes) > 0
+}
+
 func (d *databaseImpl) SetOption(key string, value string) error {
        switch key {
        case OptionStringAuthType:
                switch value {
-               case OptionValueAuthTypeDefault:
-                       d.authType = value
-               case OptionValueAuthTypeJSONCredentialFile:
-                       d.authType = value
-               case OptionValueAuthTypeJSONCredentialString:
-                       d.authType = value
-               case OptionValueAuthTypeUserAuthentication:
+               case OptionValueAuthTypeDefault,
+                       OptionValueAuthTypeJSONCredentialFile,
+                       OptionValueAuthTypeJSONCredentialString,
+                       OptionValueAuthTypeUserAuthentication,
+                       OptionValueAuthTypeAppDefaultCredentials:
                        d.authType = value
                default:
                        return adbc.Error{
@@ -129,6 +154,21 @@ func (d *databaseImpl) SetOption(key string, value string) 
error {
                d.clientSecret = value
        case OptionStringAuthRefreshToken:
                d.refreshToken = value
+       case OptionStringImpersonateTargetPrincipal:
+               d.impersonateTargetPrincipal = value
+       case OptionStringImpersonateDelegates:
+               d.impersonateDelegates = strings.Split(value, ",")
+       case OptionStringImpersonateScopes:
+               d.impersonateScopes = strings.Split(value, ",")
+       case OptionStringImpersonateLifetime:
+               duration, err := time.ParseDuration(value)
+               if err != nil {
+                       return adbc.Error{
+                               Code: adbc.StatusInvalidArgument,
+                               Msg:  fmt.Sprintf("invalid impersonate lifetime 
value `%s`: %v", value, err),
+                       }
+               }
+               d.impersonateLifetime = duration
        case OptionStringProjectID:
                d.projectID = value
        case OptionStringDatasetID:
diff --git a/go/adbc/driver/bigquery/connection.go 
b/go/adbc/driver/bigquery/connection.go
index 3be117e12..a8ec4fa65 100644
--- a/go/adbc/driver/bigquery/connection.go
+++ b/go/adbc/driver/bigquery/connection.go
@@ -29,6 +29,7 @@ import (
        "net/url"
        "regexp"
        "strconv"
+       "strings"
        "time"
 
        "cloud.google.com/go/bigquery"
@@ -37,6 +38,7 @@ import (
        "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
        "github.com/apache/arrow-go/v18/arrow"
        "golang.org/x/oauth2"
+       "google.golang.org/api/impersonate"
        "google.golang.org/api/iterator"
        "google.golang.org/api/option"
 )
@@ -50,6 +52,11 @@ type connectionImpl struct {
        clientSecret string
        refreshToken string
 
+       impersonateTargetPrincipal string
+       impersonateDelegates       []string
+       impersonateScopes          []string
+       impersonateLifetime        time.Duration
+
        // catalog is the same as the project id in BigQuery
        catalog string
        // dbSchema is the same as the dataset id in BigQuery
@@ -465,11 +472,53 @@ func (c *connectionImpl) GetOption(key string) (string, 
error) {
                return c.dbSchema, nil
        case OptionStringTableID:
                return c.tableID, nil
+       case OptionStringImpersonateLifetime:
+               if c.impersonateLifetime == 0 {
+                       // If no lifetime is set but impersonation is enabled, 
return the default
+                       if c.hasImpersonationOptions() {
+                               return (3600 * time.Second).String(), nil
+                       }
+                       return "", nil
+               }
+               return c.impersonateLifetime.String(), nil
        default:
                return c.ConnectionImplBase.GetOption(key)
        }
 }
 
+func (c *connectionImpl) SetOption(key string, value string) error {
+       switch key {
+       case OptionStringAuthType:
+               c.authType = value
+       case OptionStringAuthCredentials:
+               c.credentials = value
+       case OptionStringAuthClientID:
+               c.clientID = value
+       case OptionStringAuthClientSecret:
+               c.clientSecret = value
+       case OptionStringAuthRefreshToken:
+               c.refreshToken = value
+       case OptionStringImpersonateTargetPrincipal:
+               c.impersonateTargetPrincipal = value
+       case OptionStringImpersonateDelegates:
+               c.impersonateDelegates = strings.Split(value, ",")
+       case OptionStringImpersonateScopes:
+               c.impersonateScopes = strings.Split(value, ",")
+       case OptionStringImpersonateLifetime:
+               dur, err := time.ParseDuration(value)
+               if err != nil {
+                       return adbc.Error{
+                               Code: adbc.StatusInvalidArgument,
+                               Msg:  fmt.Sprintf("Invalid duration string for 
%s: %s", OptionStringImpersonateLifetime, err.Error()),
+                       }
+               }
+               c.impersonateLifetime = dur
+       default:
+               return c.ConnectionImplBase.SetOption(key, value)
+       }
+       return nil
+}
+
 func (c *connectionImpl) GetOptionInt(key string) (int64, error) {
        switch key {
        case OptionIntQueryResultBufferSize:
@@ -501,63 +550,99 @@ func (c *connectionImpl) newClient(ctx context.Context) 
error {
                        Msg:  "ProjectID is empty",
                }
        }
+
+       authOptions := []option.ClientOption{}
+
+       // First, establish base authentication
        switch c.authType {
-       case OptionValueAuthTypeJSONCredentialFile, 
OptionValueAuthTypeJSONCredentialString, OptionValueAuthTypeUserAuthentication:
-               var credentials option.ClientOption
-               switch c.authType {
-               case OptionValueAuthTypeJSONCredentialFile:
-                       credentials = option.WithCredentialsFile(c.credentials)
-               case OptionValueAuthTypeJSONCredentialString:
-                       credentials = 
option.WithCredentialsJSON([]byte(c.credentials))
-               default:
-                       if c.clientID == "" {
-                               return adbc.Error{
-                                       Code: adbc.StatusInvalidArgument,
-                                       Msg:  fmt.Sprintf("The `%s` parameter 
is empty", OptionStringAuthClientID),
-                               }
+       case OptionValueAuthTypeJSONCredentialFile:
+               authOptions = append(authOptions, 
option.WithCredentialsFile(c.credentials))
+       case OptionValueAuthTypeJSONCredentialString:
+               authOptions = append(authOptions, 
option.WithCredentialsJSON([]byte(c.credentials)))
+       case OptionValueAuthTypeUserAuthentication:
+               if c.clientID == "" {
+                       return adbc.Error{
+                               Code: adbc.StatusInvalidArgument,
+                               Msg:  fmt.Sprintf("The `%s` parameter is 
empty", OptionStringAuthClientID),
                        }
-                       if c.clientSecret == "" {
-                               return adbc.Error{
-                                       Code: adbc.StatusInvalidArgument,
-                                       Msg:  fmt.Sprintf("The `%s` parameter 
is empty", OptionStringAuthClientSecret),
-                               }
+               }
+               if c.clientSecret == "" {
+                       return adbc.Error{
+                               Code: adbc.StatusInvalidArgument,
+                               Msg:  fmt.Sprintf("The `%s` parameter is 
empty", OptionStringAuthClientSecret),
                        }
-                       if c.refreshToken == "" {
-                               return adbc.Error{
-                                       Code: adbc.StatusInvalidArgument,
-                                       Msg:  fmt.Sprintf("The `%s` parameter 
is empty", OptionStringAuthRefreshToken),
-                               }
+               }
+               if c.refreshToken == "" {
+                       return adbc.Error{
+                               Code: adbc.StatusInvalidArgument,
+                               Msg:  fmt.Sprintf("The `%s` parameter is 
empty", OptionStringAuthRefreshToken),
                        }
-                       credentials = option.WithTokenSource(c)
                }
-
-               client, err := bigquery.NewClient(ctx, c.catalog, credentials)
-               if err != nil {
-                       return err
+               authOptions = append(authOptions, option.WithTokenSource(c))
+       case OptionValueAuthTypeAppDefaultCredentials, "":
+               // Use Application Default Credentials (default behavior)
+               // No additional options needed - ADC is used by default
+       default:
+               return adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  fmt.Sprintf("Unknown auth type: %s", c.authType),
                }
+       }
 
-               err = client.EnableStorageReadClient(ctx, credentials)
-               if err != nil {
-                       return err
+       // Then, apply impersonation if configured (as a credential 
transformation layer)
+       if c.hasImpersonationOptions() {
+               if c.impersonateTargetPrincipal == "" {
+                       return adbc.Error{
+                               Code: adbc.StatusInvalidArgument,
+                               Msg:  fmt.Sprintf("The `%s` parameter is empty 
for impersonation", OptionStringImpersonateTargetPrincipal),
+                       }
                }
 
-               c.client = client
-       default:
-               client, err := bigquery.NewClient(ctx, c.catalog)
-               if err != nil {
-                       return err
+               var lifetime time.Duration
+               if c.impersonateLifetime != 0 {
+                       lifetime = c.impersonateLifetime
+               } else {
+                       // Use default lifetime of 1 hour when impersonation is 
enabled but no lifetime is specified
+                       lifetime = 3600 * time.Second
                }
 
-               err = client.EnableStorageReadClient(ctx)
+               impCfg := impersonate.CredentialsConfig{
+                       TargetPrincipal: c.impersonateTargetPrincipal,
+                       Delegates:       c.impersonateDelegates,
+                       Scopes:          c.impersonateScopes,
+                       Lifetime:        lifetime,
+               }
+               tokenSource, err := impersonate.CredentialsTokenSource(ctx, 
impCfg)
                if err != nil {
-                       return err
+                       return adbc.Error{
+                               Code: adbc.StatusInvalidArgument,
+                               Msg:  fmt.Sprintf("failed to create 
impersonated token source: %s", err.Error()),
+                       }
                }
+               // Replace any existing token source with the impersonated one
+               authOptions = 
[]option.ClientOption{option.WithTokenSource(tokenSource)}
+       }
+
+       client, err := bigquery.NewClient(ctx, c.catalog, authOptions...)
+       if err != nil {
+               return err
+       }
 
-               c.client = client
+       err = client.EnableStorageReadClient(ctx, authOptions...)
+       if err != nil {
+               return err
        }
+
+       c.client = client
        return nil
 }
 
+func (c *connectionImpl) hasImpersonationOptions() bool {
+       return c.impersonateTargetPrincipal != "" ||
+               len(c.impersonateDelegates) > 0 ||
+               len(c.impersonateScopes) > 0
+}
+
 var (
        // Dataset:
        //
diff --git a/go/adbc/driver/bigquery/driver.go 
b/go/adbc/driver/bigquery/driver.go
index 1fc238cf1..8c1777ae0 100644
--- a/go/adbc/driver/bigquery/driver.go
+++ b/go/adbc/driver/bigquery/driver.go
@@ -77,6 +77,38 @@ const (
 
        AccessTokenEndpoint   = "https://accounts.google.com/o/oauth2/token";
        AccessTokenServerName = "google.com"
+
+       // WithAppDefaultCredentials instructs the driver to authenticate using
+       // Application Default Credentials (ADC).
+       OptionValueAuthTypeAppDefaultCredentials = 
"adbc.bigquery.sql.auth_type.app_default_credentials"
+
+       // WithJSONCredentials instructs the driver to authenticate using the
+       // given JSON credentials. The value should be a byte array representing
+       // the JSON credentials.
+       OptionValueAuthTypeJSONCredentials = 
"adbc.bigquery.sql.auth_type.json_credentials"
+
+       // WithOAuthClientIDs instructs the driver to authenticate using the 
given
+       // OAuth client ID and client secret. The value should be a string array
+       // of length 2, where the first element is the client ID and the second
+       // is the client secret.
+       OptionValueAuthTypeOAuthClientIDs = 
"adbc.bigquery.sql.auth_type.oauth_client_ids"
+
+       // OptionStringImpersonateTargetPrincipal instructs the driver to 
impersonate the
+       // given service account email.
+       OptionStringImpersonateTargetPrincipal = 
"adbc.bigquery.sql.impersonate.target_principal"
+
+       // OptionStringImpersonateDelegates instructs the driver to impersonate 
using the
+       // given comma-separated list of service account emails in the 
delegation
+       // chain.
+       OptionStringImpersonateDelegates = 
"adbc.bigquery.sql.impersonate.delegates"
+
+       // OptionStringImpersonateScopes instructs the driver to impersonate 
using the
+       // given comma-separated list of OAuth 2.0 scopes.
+       OptionStringImpersonateScopes = "adbc.bigquery.sql.impersonate.scopes"
+
+       // OptionStringImpersonateLifetime instructs the driver to impersonate 
for the
+       // given duration (e.g. "3600s").
+       OptionStringImpersonateLifetime = 
"adbc.bigquery.sql.impersonate.lifetime"
 )
 
 var (
diff --git a/go/adbc/driver/bigquery/driver_test.go 
b/go/adbc/driver/bigquery/driver_test.go
index f5d162ada..69c35b288 100644
--- a/go/adbc/driver/bigquery/driver_test.go
+++ b/go/adbc/driver/bigquery/driver_test.go
@@ -1551,3 +1551,41 @@ func (suite *BigQueryTests) 
TestMetadataGetObjectsColumnsXdbc() {
 }
 
 var _ validation.DriverQuirks = (*BigQueryQuirks)(nil)
+
+// TestAuthTypeConsolidation tests that all auth type values are handled
+// correctly in the consolidated switch statement.
+func TestAuthTypeConsolidation(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       drv := driver.NewDriver(mem)
+       db, err := drv.NewDatabase(nil)
+       if err != nil {
+               t.Fatalf("Failed to create database: %v", err)
+       }
+       defer validation.CheckedClose(t, db)
+
+       // Test all valid auth types
+       validAuthTypes := []string{
+               driver.OptionValueAuthTypeDefault,
+               driver.OptionValueAuthTypeJSONCredentialFile,
+               driver.OptionValueAuthTypeJSONCredentialString,
+               driver.OptionValueAuthTypeUserAuthentication,
+               driver.OptionValueAuthTypeAppDefaultCredentials,
+       }
+
+       for _, authType := range validAuthTypes {
+               err := 
db.SetOptions(map[string]string{driver.OptionStringAuthType: authType})
+               if err != nil {
+                       t.Errorf("Failed to set auth type %s: %v", authType, 
err)
+               }
+       }
+
+       // Test invalid auth type
+       err = db.SetOptions(map[string]string{driver.OptionStringAuthType: 
"invalid_auth_type"})
+       if err == nil {
+               t.Error("Expected error for invalid auth type")
+       } else if !strings.Contains(err.Error(), "unknown database auth type 
value") {
+               t.Errorf("Expected error message to contain 'unknown database 
auth type value', got: %v", err)
+       }
+}

Reply via email to