This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 944752369 feat(go/adbc/driver/bigquery): support service account
impersonation (#3174)
944752369 is described below
commit 944752369ab7492dcb69479a4c473367ea19bed5
Author: Yu Ishikawa <[email protected]>
AuthorDate: Sat Jul 26 17:02:40 2025 +0900
feat(go/adbc/driver/bigquery): support service account impersonation (#3174)
Closes https://github.com/apache/arrow-adbc/issues/3168
---------
Signed-off-by: Yu Ishikawa <[email protected]>
---
go/adbc/driver/bigquery/bigquery_database.go | 76 ++++++++++---
go/adbc/driver/bigquery/connection.go | 163 ++++++++++++++++++++-------
go/adbc/driver/bigquery/driver.go | 32 ++++++
go/adbc/driver/bigquery/driver_test.go | 38 +++++++
4 files changed, 252 insertions(+), 57 deletions(-)
diff --git a/go/adbc/driver/bigquery/bigquery_database.go
b/go/adbc/driver/bigquery/bigquery_database.go
index cd6fff8a1..caaa7dbe3 100644
--- a/go/adbc/driver/bigquery/bigquery_database.go
+++ b/go/adbc/driver/bigquery/bigquery_database.go
@@ -20,6 +20,8 @@ package bigquery
import (
"context"
"fmt"
+ "strings"
+ "time"
"github.com/apache/arrow-adbc/go/adbc"
"github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
@@ -33,6 +35,12 @@ type databaseImpl struct {
clientID string
clientSecret string
refreshToken string
+
+ impersonateTargetPrincipal string
+ impersonateDelegates []string
+ impersonateScopes []string
+ impersonateLifetime time.Duration
+
// projectID is the catalog
projectID string
// datasetID is the schema
@@ -42,17 +50,21 @@ type databaseImpl struct {
func (d *databaseImpl) Open(ctx context.Context) (adbc.Connection, error) {
conn := &connectionImpl{
- ConnectionImplBase:
driverbase.NewConnectionImplBase(&d.DatabaseImplBase),
- authType: d.authType,
- credentials: d.credentials,
- clientID: d.clientID,
- clientSecret: d.clientSecret,
- refreshToken: d.refreshToken,
- tableID: d.tableID,
- catalog: d.projectID,
- dbSchema: d.datasetID,
- resultRecordBufferSize: defaultQueryResultBufferSize,
- prefetchConcurrency: defaultQueryPrefetchConcurrency,
+ ConnectionImplBase:
driverbase.NewConnectionImplBase(&d.DatabaseImplBase),
+ authType: d.authType,
+ credentials: d.credentials,
+ clientID: d.clientID,
+ clientSecret: d.clientSecret,
+ refreshToken: d.refreshToken,
+ impersonateTargetPrincipal: d.impersonateTargetPrincipal,
+ impersonateDelegates: d.impersonateDelegates,
+ impersonateScopes: d.impersonateScopes,
+ impersonateLifetime: d.impersonateLifetime,
+ tableID: d.tableID,
+ catalog: d.projectID,
+ dbSchema: d.datasetID,
+ resultRecordBufferSize: defaultQueryResultBufferSize,
+ prefetchConcurrency: defaultQueryPrefetchConcurrency,
}
err := conn.newClient(ctx)
@@ -88,6 +100,15 @@ func (d *databaseImpl) GetOption(key string) (string,
error) {
return d.datasetID, nil
case OptionStringTableID:
return d.tableID, nil
+ case OptionStringImpersonateLifetime:
+ if d.impersonateLifetime == 0 {
+ // If no lifetime is set but impersonation is enabled,
return the default
+ if d.hasImpersonationOptions() {
+ return (3600 * time.Second).String(), nil
+ }
+ return "", nil
+ }
+ return d.impersonateLifetime.String(), nil
default:
return d.DatabaseImplBase.GetOption(key)
}
@@ -103,17 +124,21 @@ func (d *databaseImpl) SetOptions(options
map[string]string) error {
return nil
}
+func (d *databaseImpl) hasImpersonationOptions() bool {
+ return d.impersonateTargetPrincipal != "" ||
+ len(d.impersonateDelegates) > 0 ||
+ len(d.impersonateScopes) > 0
+}
+
func (d *databaseImpl) SetOption(key string, value string) error {
switch key {
case OptionStringAuthType:
switch value {
- case OptionValueAuthTypeDefault:
- d.authType = value
- case OptionValueAuthTypeJSONCredentialFile:
- d.authType = value
- case OptionValueAuthTypeJSONCredentialString:
- d.authType = value
- case OptionValueAuthTypeUserAuthentication:
+ case OptionValueAuthTypeDefault,
+ OptionValueAuthTypeJSONCredentialFile,
+ OptionValueAuthTypeJSONCredentialString,
+ OptionValueAuthTypeUserAuthentication,
+ OptionValueAuthTypeAppDefaultCredentials:
d.authType = value
default:
return adbc.Error{
@@ -129,6 +154,21 @@ func (d *databaseImpl) SetOption(key string, value string)
error {
d.clientSecret = value
case OptionStringAuthRefreshToken:
d.refreshToken = value
+ case OptionStringImpersonateTargetPrincipal:
+ d.impersonateTargetPrincipal = value
+ case OptionStringImpersonateDelegates:
+ d.impersonateDelegates = strings.Split(value, ",")
+ case OptionStringImpersonateScopes:
+ d.impersonateScopes = strings.Split(value, ",")
+ case OptionStringImpersonateLifetime:
+ duration, err := time.ParseDuration(value)
+ if err != nil {
+ return adbc.Error{
+ Code: adbc.StatusInvalidArgument,
+ Msg: fmt.Sprintf("invalid impersonate lifetime
value `%s`: %v", value, err),
+ }
+ }
+ d.impersonateLifetime = duration
case OptionStringProjectID:
d.projectID = value
case OptionStringDatasetID:
diff --git a/go/adbc/driver/bigquery/connection.go
b/go/adbc/driver/bigquery/connection.go
index 3be117e12..a8ec4fa65 100644
--- a/go/adbc/driver/bigquery/connection.go
+++ b/go/adbc/driver/bigquery/connection.go
@@ -29,6 +29,7 @@ import (
"net/url"
"regexp"
"strconv"
+ "strings"
"time"
"cloud.google.com/go/bigquery"
@@ -37,6 +38,7 @@ import (
"github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
"github.com/apache/arrow-go/v18/arrow"
"golang.org/x/oauth2"
+ "google.golang.org/api/impersonate"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)
@@ -50,6 +52,11 @@ type connectionImpl struct {
clientSecret string
refreshToken string
+ impersonateTargetPrincipal string
+ impersonateDelegates []string
+ impersonateScopes []string
+ impersonateLifetime time.Duration
+
// catalog is the same as the project id in BigQuery
catalog string
// dbSchema is the same as the dataset id in BigQuery
@@ -465,11 +472,53 @@ func (c *connectionImpl) GetOption(key string) (string,
error) {
return c.dbSchema, nil
case OptionStringTableID:
return c.tableID, nil
+ case OptionStringImpersonateLifetime:
+ if c.impersonateLifetime == 0 {
+ // If no lifetime is set but impersonation is enabled,
return the default
+ if c.hasImpersonationOptions() {
+ return (3600 * time.Second).String(), nil
+ }
+ return "", nil
+ }
+ return c.impersonateLifetime.String(), nil
default:
return c.ConnectionImplBase.GetOption(key)
}
}
+func (c *connectionImpl) SetOption(key string, value string) error {
+ switch key {
+ case OptionStringAuthType:
+ c.authType = value
+ case OptionStringAuthCredentials:
+ c.credentials = value
+ case OptionStringAuthClientID:
+ c.clientID = value
+ case OptionStringAuthClientSecret:
+ c.clientSecret = value
+ case OptionStringAuthRefreshToken:
+ c.refreshToken = value
+ case OptionStringImpersonateTargetPrincipal:
+ c.impersonateTargetPrincipal = value
+ case OptionStringImpersonateDelegates:
+ c.impersonateDelegates = strings.Split(value, ",")
+ case OptionStringImpersonateScopes:
+ c.impersonateScopes = strings.Split(value, ",")
+ case OptionStringImpersonateLifetime:
+ dur, err := time.ParseDuration(value)
+ if err != nil {
+ return adbc.Error{
+ Code: adbc.StatusInvalidArgument,
+ Msg: fmt.Sprintf("Invalid duration string for
%s: %s", OptionStringImpersonateLifetime, err.Error()),
+ }
+ }
+ c.impersonateLifetime = dur
+ default:
+ return c.ConnectionImplBase.SetOption(key, value)
+ }
+ return nil
+}
+
func (c *connectionImpl) GetOptionInt(key string) (int64, error) {
switch key {
case OptionIntQueryResultBufferSize:
@@ -501,63 +550,99 @@ func (c *connectionImpl) newClient(ctx context.Context)
error {
Msg: "ProjectID is empty",
}
}
+
+ authOptions := []option.ClientOption{}
+
+ // First, establish base authentication
switch c.authType {
- case OptionValueAuthTypeJSONCredentialFile,
OptionValueAuthTypeJSONCredentialString, OptionValueAuthTypeUserAuthentication:
- var credentials option.ClientOption
- switch c.authType {
- case OptionValueAuthTypeJSONCredentialFile:
- credentials = option.WithCredentialsFile(c.credentials)
- case OptionValueAuthTypeJSONCredentialString:
- credentials =
option.WithCredentialsJSON([]byte(c.credentials))
- default:
- if c.clientID == "" {
- return adbc.Error{
- Code: adbc.StatusInvalidArgument,
- Msg: fmt.Sprintf("The `%s` parameter
is empty", OptionStringAuthClientID),
- }
+ case OptionValueAuthTypeJSONCredentialFile:
+ authOptions = append(authOptions,
option.WithCredentialsFile(c.credentials))
+ case OptionValueAuthTypeJSONCredentialString:
+ authOptions = append(authOptions,
option.WithCredentialsJSON([]byte(c.credentials)))
+ case OptionValueAuthTypeUserAuthentication:
+ if c.clientID == "" {
+ return adbc.Error{
+ Code: adbc.StatusInvalidArgument,
+ Msg: fmt.Sprintf("The `%s` parameter is
empty", OptionStringAuthClientID),
}
- if c.clientSecret == "" {
- return adbc.Error{
- Code: adbc.StatusInvalidArgument,
- Msg: fmt.Sprintf("The `%s` parameter
is empty", OptionStringAuthClientSecret),
- }
+ }
+ if c.clientSecret == "" {
+ return adbc.Error{
+ Code: adbc.StatusInvalidArgument,
+ Msg: fmt.Sprintf("The `%s` parameter is
empty", OptionStringAuthClientSecret),
}
- if c.refreshToken == "" {
- return adbc.Error{
- Code: adbc.StatusInvalidArgument,
- Msg: fmt.Sprintf("The `%s` parameter
is empty", OptionStringAuthRefreshToken),
- }
+ }
+ if c.refreshToken == "" {
+ return adbc.Error{
+ Code: adbc.StatusInvalidArgument,
+ Msg: fmt.Sprintf("The `%s` parameter is
empty", OptionStringAuthRefreshToken),
}
- credentials = option.WithTokenSource(c)
}
-
- client, err := bigquery.NewClient(ctx, c.catalog, credentials)
- if err != nil {
- return err
+ authOptions = append(authOptions, option.WithTokenSource(c))
+ case OptionValueAuthTypeAppDefaultCredentials, "":
+ // Use Application Default Credentials (default behavior)
+ // No additional options needed - ADC is used by default
+ default:
+ return adbc.Error{
+ Code: adbc.StatusInvalidArgument,
+ Msg: fmt.Sprintf("Unknown auth type: %s", c.authType),
}
+ }
- err = client.EnableStorageReadClient(ctx, credentials)
- if err != nil {
- return err
+ // Then, apply impersonation if configured (as a credential
transformation layer)
+ if c.hasImpersonationOptions() {
+ if c.impersonateTargetPrincipal == "" {
+ return adbc.Error{
+ Code: adbc.StatusInvalidArgument,
+ Msg: fmt.Sprintf("The `%s` parameter is empty
for impersonation", OptionStringImpersonateTargetPrincipal),
+ }
}
- c.client = client
- default:
- client, err := bigquery.NewClient(ctx, c.catalog)
- if err != nil {
- return err
+ var lifetime time.Duration
+ if c.impersonateLifetime != 0 {
+ lifetime = c.impersonateLifetime
+ } else {
+ // Use default lifetime of 1 hour when impersonation is
enabled but no lifetime is specified
+ lifetime = 3600 * time.Second
}
- err = client.EnableStorageReadClient(ctx)
+ impCfg := impersonate.CredentialsConfig{
+ TargetPrincipal: c.impersonateTargetPrincipal,
+ Delegates: c.impersonateDelegates,
+ Scopes: c.impersonateScopes,
+ Lifetime: lifetime,
+ }
+ tokenSource, err := impersonate.CredentialsTokenSource(ctx,
impCfg)
if err != nil {
- return err
+ return adbc.Error{
+ Code: adbc.StatusInvalidArgument,
+ Msg: fmt.Sprintf("failed to create
impersonated token source: %s", err.Error()),
+ }
}
+ // Replace any existing token source with the impersonated one
+ authOptions =
[]option.ClientOption{option.WithTokenSource(tokenSource)}
+ }
+
+ client, err := bigquery.NewClient(ctx, c.catalog, authOptions...)
+ if err != nil {
+ return err
+ }
- c.client = client
+ err = client.EnableStorageReadClient(ctx, authOptions...)
+ if err != nil {
+ return err
}
+
+ c.client = client
return nil
}
+func (c *connectionImpl) hasImpersonationOptions() bool {
+ return c.impersonateTargetPrincipal != "" ||
+ len(c.impersonateDelegates) > 0 ||
+ len(c.impersonateScopes) > 0
+}
+
var (
// Dataset:
//
diff --git a/go/adbc/driver/bigquery/driver.go
b/go/adbc/driver/bigquery/driver.go
index 1fc238cf1..8c1777ae0 100644
--- a/go/adbc/driver/bigquery/driver.go
+++ b/go/adbc/driver/bigquery/driver.go
@@ -77,6 +77,38 @@ const (
AccessTokenEndpoint = "https://accounts.google.com/o/oauth2/token"
AccessTokenServerName = "google.com"
+
+ // WithAppDefaultCredentials instructs the driver to authenticate using
+ // Application Default Credentials (ADC).
+ OptionValueAuthTypeAppDefaultCredentials =
"adbc.bigquery.sql.auth_type.app_default_credentials"
+
+ // WithJSONCredentials instructs the driver to authenticate using the
+ // given JSON credentials. The value should be a byte array representing
+ // the JSON credentials.
+ OptionValueAuthTypeJSONCredentials =
"adbc.bigquery.sql.auth_type.json_credentials"
+
+ // WithOAuthClientIDs instructs the driver to authenticate using the
given
+ // OAuth client ID and client secret. The value should be a string array
+ // of length 2, where the first element is the client ID and the second
+ // is the client secret.
+ OptionValueAuthTypeOAuthClientIDs =
"adbc.bigquery.sql.auth_type.oauth_client_ids"
+
+ // OptionStringImpersonateTargetPrincipal instructs the driver to
impersonate the
+ // given service account email.
+ OptionStringImpersonateTargetPrincipal =
"adbc.bigquery.sql.impersonate.target_principal"
+
+ // OptionStringImpersonateDelegates instructs the driver to impersonate
using the
+ // given comma-separated list of service account emails in the
delegation
+ // chain.
+ OptionStringImpersonateDelegates =
"adbc.bigquery.sql.impersonate.delegates"
+
+ // OptionStringImpersonateScopes instructs the driver to impersonate
using the
+ // given comma-separated list of OAuth 2.0 scopes.
+ OptionStringImpersonateScopes = "adbc.bigquery.sql.impersonate.scopes"
+
+ // OptionStringImpersonateLifetime instructs the driver to impersonate
for the
+ // given duration (e.g. "3600s").
+ OptionStringImpersonateLifetime =
"adbc.bigquery.sql.impersonate.lifetime"
)
var (
diff --git a/go/adbc/driver/bigquery/driver_test.go
b/go/adbc/driver/bigquery/driver_test.go
index f5d162ada..69c35b288 100644
--- a/go/adbc/driver/bigquery/driver_test.go
+++ b/go/adbc/driver/bigquery/driver_test.go
@@ -1551,3 +1551,41 @@ func (suite *BigQueryTests)
TestMetadataGetObjectsColumnsXdbc() {
}
var _ validation.DriverQuirks = (*BigQueryQuirks)(nil)
+
+// TestAuthTypeConsolidation tests that all auth type values are handled
+// correctly in the consolidated switch statement.
+func TestAuthTypeConsolidation(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ drv := driver.NewDriver(mem)
+ db, err := drv.NewDatabase(nil)
+ if err != nil {
+ t.Fatalf("Failed to create database: %v", err)
+ }
+ defer validation.CheckedClose(t, db)
+
+ // Test all valid auth types
+ validAuthTypes := []string{
+ driver.OptionValueAuthTypeDefault,
+ driver.OptionValueAuthTypeJSONCredentialFile,
+ driver.OptionValueAuthTypeJSONCredentialString,
+ driver.OptionValueAuthTypeUserAuthentication,
+ driver.OptionValueAuthTypeAppDefaultCredentials,
+ }
+
+ for _, authType := range validAuthTypes {
+ err :=
db.SetOptions(map[string]string{driver.OptionStringAuthType: authType})
+ if err != nil {
+ t.Errorf("Failed to set auth type %s: %v", authType,
err)
+ }
+ }
+
+ // Test invalid auth type
+ err = db.SetOptions(map[string]string{driver.OptionStringAuthType:
"invalid_auth_type"})
+ if err == nil {
+ t.Error("Expected error for invalid auth type")
+ } else if !strings.Contains(err.Error(), "unknown database auth type
value") {
+ t.Errorf("Expected error message to contain 'unknown database
auth type value', got: %v", err)
+ }
+}