zeroshade commented on code in PR #2651: URL: https://github.com/apache/arrow-adbc/pull/2651#discussion_r2017494290
########## go/adbc/driver/flightsql/flightsql_database.go: ########## @@ -149,7 +151,7 @@ func (d *databaseImpl) SetOptions(cnOptions map[string]string) error { if u, ok := cnOptions[adbc.OptionKeyUsername]; ok { if d.hdrs.Len() > 0 { return adbc.Error{ - Msg: "Authorization header already provided, do not provide user/pass also", + Msg: "Authentication conflict: Use either Authorization header OR username/password parameter", Review Comment: should all of these messages now say `Authorization header OR username/password OR token`? ########## go/adbc/driver/flightsql/flightsql_database.go: ########## @@ -160,14 +162,78 @@ func (d *databaseImpl) SetOptions(cnOptions map[string]string) error { if p, ok := cnOptions[adbc.OptionKeyPassword]; ok { if d.hdrs.Len() > 0 { return adbc.Error{ - Msg: "Authorization header already provided, do not provide user/pass also", + Msg: "Authentication conflict: Use either Authorization header OR username/password parameter", Code: adbc.StatusInvalidArgument, } } d.pass = p delete(cnOptions, adbc.OptionKeyPassword) } + // if token exists it can by pass or apply token exchange Review Comment: ```suggestion // if token exists it can bypass or apply token exchange ``` ########## go/adbc/adbc.go: ########## @@ -250,6 +250,7 @@ const ( OptionKeyURI = "uri" OptionKeyUsername = "username" OptionKeyPassword = "password" + OptionKeyToken = "token" Review Comment: Not sure if we should add this at the top level as a universal option like this unless we're going to incorporate the oauth flow into the base implementation for all the drivers to use. It might be better to keep this in the flightsql driver and have it be an `adbc.flightsql.*` option ########## go/adbc/driver/flightsql/flightsql_database.go: ########## @@ -160,14 +162,78 @@ func (d *databaseImpl) SetOptions(cnOptions map[string]string) error { if p, ok := cnOptions[adbc.OptionKeyPassword]; ok { if d.hdrs.Len() > 0 { return adbc.Error{ - Msg: "Authorization header already provided, do not provide user/pass also", + Msg: "Authentication conflict: Use either Authorization header OR username/password parameter", Code: adbc.StatusInvalidArgument, } } d.pass = p delete(cnOptions, adbc.OptionKeyPassword) } + // if token exists it can by pass or apply token exchange + // else check oauth flow + if t, ok := cnOptions[adbc.OptionKeyToken]; ok { + if d.hdrs.Len() > 0 { + return adbc.Error{ + Msg: "Authentication conflict: Use either Authorization header OR token parameter", + Code: adbc.StatusInvalidArgument, + } + } + + // if contains token. it can bypass or use token exchange + if flow, ok := cnOptions[OptionKeyOauthFlow]; ok { + var flowVal int + var err error + if flowVal, err = strconv.Atoi(flow); err != nil || flowVal != TokenExchange { + return adbc.Error{ + Msg: "unsupported option", + Code: adbc.StatusInvalidArgument, + } + } + + tokExchange, err := newTokenExchangeFlow(cnOptions) + if err != nil { + return err + } + d.oauthFlow = tokExchange + delete(cnOptions, OptionKeyOauthFlow) + } else { + d.token = t + delete(cnOptions, adbc.OptionKeyToken) + } Review Comment: I'm confused, below you error if the OauthFlow key is set and token is not empty. Shouldn't this be an error here? ########## go/adbc/driver/flightsql/flightsql_database.go: ########## @@ -160,14 +162,78 @@ func (d *databaseImpl) SetOptions(cnOptions map[string]string) error { if p, ok := cnOptions[adbc.OptionKeyPassword]; ok { if d.hdrs.Len() > 0 { return adbc.Error{ - Msg: "Authorization header already provided, do not provide user/pass also", + Msg: "Authentication conflict: Use either Authorization header OR username/password parameter", Code: adbc.StatusInvalidArgument, } } d.pass = p delete(cnOptions, adbc.OptionKeyPassword) } + // if token exists it can by pass or apply token exchange + // else check oauth flow + if t, ok := cnOptions[adbc.OptionKeyToken]; ok { + if d.hdrs.Len() > 0 { + return adbc.Error{ + Msg: "Authentication conflict: Use either Authorization header OR token parameter", + Code: adbc.StatusInvalidArgument, + } + } + + // if contains token. it can bypass or use token exchange + if flow, ok := cnOptions[OptionKeyOauthFlow]; ok { + var flowVal int + var err error + if flowVal, err = strconv.Atoi(flow); err != nil || flowVal != TokenExchange { + return adbc.Error{ + Msg: "unsupported option", + Code: adbc.StatusInvalidArgument, + } + } + + tokExchange, err := newTokenExchangeFlow(cnOptions) + if err != nil { + return err + } + d.oauthFlow = tokExchange + delete(cnOptions, OptionKeyOauthFlow) + } else { + d.token = t + delete(cnOptions, adbc.OptionKeyToken) + } + } + + if flow, ok := cnOptions[OptionKeyOauthFlow]; ok { + if d.token != "" { + return adbc.Error{ + Msg: "Authentication conflict: Use either token parameter or OAuth flow", + Code: adbc.StatusInvalidArgument, + } + } + var flowVal int + var err error + if flowVal, err = strconv.Atoi(flow); err != nil { + return adbc.Error{ + Msg: fmt.Sprintf("invalid OAuth flow option: %s", flow), + Code: adbc.StatusInvalidArgument, + } + } Review Comment: ```suggestion flowVal, err := strconv.Atoi(flow) if err != nil { return adbc.Error{ Msg: fmt.Sprintf("invalid OAuth flow option: %s", flow), Code: adbc.StatusInvalidArgument, } } ``` ########## go/adbc/driver/flightsql/flightsql_database.go: ########## @@ -387,7 +453,19 @@ func getFlightClient(ctx context.Context, loc string, d *databaseImpl, authMiddl if len(authMiddle.hdrs.Get("authorization")) > 0 { d.Logger.DebugContext(ctx, "reusing auth token", "location", loc) } else { - if d.user != "" || d.pass != "" { + if d.token != "" { + authMiddle.mutex.Lock() + defer authMiddle.mutex.Unlock() + authMiddle.hdrs.Set("authorization", "Bearer "+d.token) Review Comment: why not do this in the options parsing? instead of passing the token around or even needing a separate option for it, just use the existing authorization option to add it as a header with `Bearer {token}`. That would be simpler, right? ########## go/adbc/driver/flightsql/flightsql_database.go: ########## @@ -160,14 +162,78 @@ func (d *databaseImpl) SetOptions(cnOptions map[string]string) error { if p, ok := cnOptions[adbc.OptionKeyPassword]; ok { if d.hdrs.Len() > 0 { return adbc.Error{ - Msg: "Authorization header already provided, do not provide user/pass also", + Msg: "Authentication conflict: Use either Authorization header OR username/password parameter", Code: adbc.StatusInvalidArgument, } } d.pass = p delete(cnOptions, adbc.OptionKeyPassword) } + // if token exists it can by pass or apply token exchange + // else check oauth flow + if t, ok := cnOptions[adbc.OptionKeyToken]; ok { + if d.hdrs.Len() > 0 { + return adbc.Error{ + Msg: "Authentication conflict: Use either Authorization header OR token parameter", + Code: adbc.StatusInvalidArgument, + } + } + + // if contains token. it can bypass or use token exchange + if flow, ok := cnOptions[OptionKeyOauthFlow]; ok { + var flowVal int + var err error + if flowVal, err = strconv.Atoi(flow); err != nil || flowVal != TokenExchange { + return adbc.Error{ + Msg: "unsupported option", + Code: adbc.StatusInvalidArgument, + } + } + + tokExchange, err := newTokenExchangeFlow(cnOptions) + if err != nil { + return err + } + d.oauthFlow = tokExchange + delete(cnOptions, OptionKeyOauthFlow) + } else { + d.token = t + delete(cnOptions, adbc.OptionKeyToken) + } + } + + if flow, ok := cnOptions[OptionKeyOauthFlow]; ok { + if d.token != "" { + return adbc.Error{ + Msg: "Authentication conflict: Use either token parameter or OAuth flow", + Code: adbc.StatusInvalidArgument, + } + } + var flowVal int + var err error + if flowVal, err = strconv.Atoi(flow); err != nil { + return adbc.Error{ + Msg: fmt.Sprintf("invalid OAuth flow option: %s", flow), + Code: adbc.StatusInvalidArgument, + } + } + switch flowVal { + case ClientCredentials: + cl, err := newClientCredentials(cnOptions) + if err != nil { + return err + } + d.oauthFlow = cl + delete(cnOptions, OptionKeyOauthFlow) + default: + return adbc.Error{ + Msg: fmt.Sprintf("oauth flow not implemented: %s", flow), + Code: adbc.StatusNotImplemented, + } + } Review Comment: if we're just passing it into a switch statement, why bother with the `strconv.Atoi`? just use the string value in the switch ########## go/adbc/driver/flightsql/flightsql_database.go: ########## @@ -387,7 +453,19 @@ func getFlightClient(ctx context.Context, loc string, d *databaseImpl, authMiddl if len(authMiddle.hdrs.Get("authorization")) > 0 { d.Logger.DebugContext(ctx, "reusing auth token", "location", loc) } else { - if d.user != "" || d.pass != "" { + if d.token != "" { + authMiddle.mutex.Lock() + defer authMiddle.mutex.Unlock() + authMiddle.hdrs.Set("authorization", "Bearer "+d.token) + } else if d.oauthFlow != nil { + token, err := d.oauthFlow.GetToken(ctx) + if err != nil { + return nil, adbcFromFlightStatusWithDetails(err, nil, nil, "Authenticate Oauth") + } + authMiddle.mutex.Lock() + defer authMiddle.mutex.Unlock() + authMiddle.hdrs.Set("authorization", "Bearer "+token.AccessToken) Review Comment: instead of repeating this so frequently, it might make more sense to add a method to the `bearerAuthMiddleware` object like `SetHeader` which does this. That way we can be sure that we're avoiding potentially forgetting to lock/unlock the mutex ########## go/adbc/driver/flightsql/flightsql_oauth.go: ########## @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package flightsql + +import ( + "context" + "fmt" + + "github.com/apache/arrow-adbc/go/adbc" + "golang.org/x/oauth2" +) + +type OauthAuthFlow interface { + GetToken(ctx context.Context) (*oauth2.Token, error) +} + +const ( + AuthPKCE = 1 << iota + ClientCredentials + TokenExchange +) + +type clientCredentials struct { + conf *oauth2.Config + token oauth2.TokenSource +} + +func newClientCredentials(options map[string]string) (*clientCredentials, error) { + clientId, ok := options[OptionKeyClientId] + if !ok { + return nil, fmt.Errorf("client credentials grant requires client_id") + } + + clientSecret, ok := options[OptionKeyClientSecret] + if !ok { + return nil, fmt.Errorf("client credentials grant requires client_secret") + } + + tokenURI, ok := options[OptionKeyTokenURI] + if !ok { + return nil, fmt.Errorf("client credentials grant requires token_uri") + } + + delete(options, OptionKeyClientId) + delete(options, OptionKeyClientSecret) + delete(options, OptionKeyTokenURI) + conf := &oauth2.Config{ + ClientID: clientId, + ClientSecret: clientSecret, + Endpoint: oauth2.Endpoint{ + TokenURL: tokenURI, + }, + } + + scopes := options[OptionKeyScope] + if scopes != "" { + conf.Scopes = []string{scopes} + delete(options, OptionKeyScope) + } + + return &clientCredentials{ + conf: conf, + }, nil +} + +func (c *clientCredentials) GetToken(ctx context.Context) (*oauth2.Token, error) { + option := []oauth2.AuthCodeOption{oauth2.SetAuthURLParam("grant_type", "client_credentials")} + if c.token == nil { + tok, err := c.conf.Exchange(ctx, "", option...) + if err != nil { + return nil, err + } + + c.token = c.conf.TokenSource(ctx, tok) + return tok, nil + } + + return c.token.Token() +} + +type tokenExchange struct { + conf *oauth2.Config + tokenExchangeOptions []oauth2.AuthCodeOption + token oauth2.TokenSource +} + +func newTokenExchangeFlow(options map[string]string) (*tokenExchange, error) { + token, ok := options[adbc.OptionKeyToken] + if !ok { + return nil, fmt.Errorf("token Exchange grant requires token") + } + + subjectTokenType, ok := options[OptionKeySubjectTokenType] + if !ok { + return nil, fmt.Errorf("token Exchange grant requires subject token type") + } + + tokenURI, ok := options[OptionKeyTokenURI] + if !ok { + return nil, fmt.Errorf("token exchange grant requires token URI") + } + + tokOptions := []oauth2.AuthCodeOption{ + oauth2.SetAuthURLParam("grant_type", "urn:ietf:params:oauth:grant-type:token-exchange"), + oauth2.SetAuthURLParam("subject_token", token), + oauth2.SetAuthURLParam("subject_token_type", subjectTokenType), + } + + if actor, ok := options[OptionKeyActorToken]; ok { + tokOptions = append(tokOptions, oauth2.SetAuthURLParam("actor_token", actor)) + delete(options, OptionKeyActorToken) + if actorTokenType, ok := options[OptionKeyActorTokenType]; ok { + tokOptions = append(tokOptions, oauth2.SetAuthURLParam("actor_token_type", actorTokenType)) + delete(options, OptionKeyActorTokenType) + } else { + return nil, fmt.Errorf("token exchange grant requires actor_token_type") + } + } + + if reqTokenType, ok := options[OptionKeyReqTokenType]; ok { + tokOptions = append(tokOptions, oauth2.SetAuthURLParam("requested_token_type", reqTokenType)) + delete(options, OptionKeyReqTokenType) + } + + if aud, ok := options[OptionKeyExchangeAud]; ok { + tokOptions = append(tokOptions, oauth2.SetAuthURLParam("audience", aud)) + delete(options, OptionKeyExchangeAud) + } + + if resource, ok := options[OptionKeyExchangeResource]; ok { + tokOptions = append(tokOptions, oauth2.SetAuthURLParam("resource", resource)) + delete(options, OptionKeyExchangeResource) + } + + if scope, ok := options[OptionKeyExchangeScope]; ok { + tokOptions = append(tokOptions, oauth2.SetAuthURLParam("scope", scope)) + delete(options, OptionKeyExchangeScope) + } + + delete(options, OptionKeyTokenURI) + delete(options, OptionKeySubjectTokenType) + delete(options, adbc.OptionKeyToken) + + return &tokenExchange{ + conf: &oauth2.Config{ + Endpoint: oauth2.Endpoint{ + TokenURL: tokenURI, + }, + }, + tokenExchangeOptions: tokOptions, + }, nil +} + +func (f *tokenExchange) GetToken(ctx context.Context) (*oauth2.Token, error) { + if f.token == nil { + tok, err := f.conf.Exchange(ctx, "", f.tokenExchangeOptions...) + if err != nil { + return nil, err + } + + f.token = f.conf.TokenSource(ctx, tok) + return tok, nil + } + + token, err := f.token.Token() + if err != nil { + return nil, err + } + return token, nil Review Comment: What happens when the token expires? This will continue to return the no longer valid token instead of refreshing it, wouldn't it? ########## go/adbc/driver/flightsql/flightsql_database.go: ########## @@ -387,7 +453,19 @@ func getFlightClient(ctx context.Context, loc string, d *databaseImpl, authMiddl if len(authMiddle.hdrs.Get("authorization")) > 0 { d.Logger.DebugContext(ctx, "reusing auth token", "location", loc) } else { - if d.user != "" || d.pass != "" { + if d.token != "" { + authMiddle.mutex.Lock() + defer authMiddle.mutex.Unlock() + authMiddle.hdrs.Set("authorization", "Bearer "+d.token) + } else if d.oauthFlow != nil { + token, err := d.oauthFlow.GetToken(ctx) + if err != nil { + return nil, adbcFromFlightStatusWithDetails(err, nil, nil, "Authenticate Oauth") + } + authMiddle.mutex.Lock() + defer authMiddle.mutex.Unlock() + authMiddle.hdrs.Set("authorization", "Bearer "+token.AccessToken) Review Comment: this should be `token.Type() + " " + token.AccessToken` ########## go/adbc/driver/flightsql/flightsql_oauth.go: ########## @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package flightsql + +import ( + "context" + "fmt" + + "github.com/apache/arrow-adbc/go/adbc" + "golang.org/x/oauth2" +) + +type OauthAuthFlow interface { + GetToken(ctx context.Context) (*oauth2.Token, error) +} + +const ( + AuthPKCE = 1 << iota + ClientCredentials + TokenExchange +) + +type clientCredentials struct { + conf *oauth2.Config + token oauth2.TokenSource +} + +func newClientCredentials(options map[string]string) (*clientCredentials, error) { + clientId, ok := options[OptionKeyClientId] + if !ok { + return nil, fmt.Errorf("client credentials grant requires client_id") + } + + clientSecret, ok := options[OptionKeyClientSecret] + if !ok { + return nil, fmt.Errorf("client credentials grant requires client_secret") + } + + tokenURI, ok := options[OptionKeyTokenURI] + if !ok { + return nil, fmt.Errorf("client credentials grant requires token_uri") + } + + delete(options, OptionKeyClientId) + delete(options, OptionKeyClientSecret) + delete(options, OptionKeyTokenURI) + conf := &oauth2.Config{ + ClientID: clientId, + ClientSecret: clientSecret, + Endpoint: oauth2.Endpoint{ + TokenURL: tokenURI, + }, + } + + scopes := options[OptionKeyScope] + if scopes != "" { + conf.Scopes = []string{scopes} + delete(options, OptionKeyScope) + } + + return &clientCredentials{ + conf: conf, + }, nil +} + +func (c *clientCredentials) GetToken(ctx context.Context) (*oauth2.Token, error) { + option := []oauth2.AuthCodeOption{oauth2.SetAuthURLParam("grant_type", "client_credentials")} + if c.token == nil { + tok, err := c.conf.Exchange(ctx, "", option...) + if err != nil { + return nil, err + } + + c.token = c.conf.TokenSource(ctx, tok) + return tok, nil + } + + return c.token.Token() +} + +type tokenExchange struct { + conf *oauth2.Config + tokenExchangeOptions []oauth2.AuthCodeOption + token oauth2.TokenSource +} + +func newTokenExchangeFlow(options map[string]string) (*tokenExchange, error) { + token, ok := options[adbc.OptionKeyToken] + if !ok { + return nil, fmt.Errorf("token Exchange grant requires token") + } + + subjectTokenType, ok := options[OptionKeySubjectTokenType] + if !ok { + return nil, fmt.Errorf("token Exchange grant requires subject token type") + } + + tokenURI, ok := options[OptionKeyTokenURI] + if !ok { + return nil, fmt.Errorf("token exchange grant requires token URI") + } + + tokOptions := []oauth2.AuthCodeOption{ + oauth2.SetAuthURLParam("grant_type", "urn:ietf:params:oauth:grant-type:token-exchange"), + oauth2.SetAuthURLParam("subject_token", token), + oauth2.SetAuthURLParam("subject_token_type", subjectTokenType), + } + + if actor, ok := options[OptionKeyActorToken]; ok { + tokOptions = append(tokOptions, oauth2.SetAuthURLParam("actor_token", actor)) + delete(options, OptionKeyActorToken) + if actorTokenType, ok := options[OptionKeyActorTokenType]; ok { + tokOptions = append(tokOptions, oauth2.SetAuthURLParam("actor_token_type", actorTokenType)) + delete(options, OptionKeyActorTokenType) + } else { + return nil, fmt.Errorf("token exchange grant requires actor_token_type") + } + } + + if reqTokenType, ok := options[OptionKeyReqTokenType]; ok { + tokOptions = append(tokOptions, oauth2.SetAuthURLParam("requested_token_type", reqTokenType)) + delete(options, OptionKeyReqTokenType) + } + + if aud, ok := options[OptionKeyExchangeAud]; ok { + tokOptions = append(tokOptions, oauth2.SetAuthURLParam("audience", aud)) + delete(options, OptionKeyExchangeAud) + } + + if resource, ok := options[OptionKeyExchangeResource]; ok { + tokOptions = append(tokOptions, oauth2.SetAuthURLParam("resource", resource)) + delete(options, OptionKeyExchangeResource) + } + + if scope, ok := options[OptionKeyExchangeScope]; ok { + tokOptions = append(tokOptions, oauth2.SetAuthURLParam("scope", scope)) + delete(options, OptionKeyExchangeScope) + } Review Comment: make a function that does this instead of repeating this over and over please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org