This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b0e0545 Support reloading OAuth2 key file (#1441)
5b0e0545 is described below

commit 5b0e0545fc9ba4ad3b86c2e1d3615c007b59558f
Author: Zike Yang <[email protected]>
AuthorDate: Thu Nov 27 13:07:56 2025 +0800

    Support reloading OAuth2 key file (#1441)
---
 oauth2/auth_suite_test.go                 |  20 +++
 oauth2/cache/cache.go                     |  70 +++--------
 oauth2/client_credentials_flow.go         |  99 ++++++++-------
 oauth2/client_credentials_flow_test.go    |  19 ++-
 oauth2/store/keyring.go                   | 195 ------------------------------
 oauth2/store/memory.go                    |  87 -------------
 oauth2/store/store.go                     |  45 -------
 pulsar/auth/oauth2.go                     |  45 +------
 pulsar/auth/oauth2_test.go                |  93 +++++++++++++-
 pulsaradmin/pkg/admin/auth/oauth2.go      | 109 ++---------------
 pulsaradmin/pkg/admin/auth/oauth2_test.go |  32 ++---
 11 files changed, 216 insertions(+), 598 deletions(-)

diff --git a/oauth2/auth_suite_test.go b/oauth2/auth_suite_test.go
index 54b24299..0f81487f 100644
--- a/oauth2/auth_suite_test.go
+++ b/oauth2/auth_suite_test.go
@@ -58,6 +58,26 @@ func (te *MockTokenExchanger) ExchangeDeviceCode(_ 
context.Context,
        return te.ReturnsTokens, te.ReturnsError
 }
 
+type MockGrantProvider struct {
+       keyFile *KeyFile
+}
+
+func (mgp *MockGrantProvider) GetGrant(audience string, opts 
*ClientCredentialsFlowOptions) (
+       *AuthorizationGrant, error) {
+       scopes := []string{mgp.keyFile.Scope}
+       if opts != nil && len(opts.AdditionalScopes) > 0 {
+               scopes = append(scopes, opts.AdditionalScopes...)
+       }
+       return &AuthorizationGrant{
+               Type:              GrantTypeClientCredentials,
+               Audience:          audience,
+               ClientID:          mgp.keyFile.ClientID,
+               ClientCredentials: mgp.keyFile,
+               TokenEndpoint:     oidcEndpoints.TokenEndpoint,
+               Scopes:            scopes,
+       }, nil
+}
+
 var oidcEndpoints = OIDCWellKnownEndpoints{
        AuthorizationEndpoint:       "http://issuer/auth/authorize";,
        TokenEndpoint:               "http://issuer/auth/token";,
diff --git a/oauth2/cache/cache.go b/oauth2/cache/cache.go
index e2279843..df41efed 100644
--- a/oauth2/cache/cache.go
+++ b/oauth2/cache/cache.go
@@ -23,8 +23,6 @@ import (
        "time"
 
        "github.com/apache/pulsar-client-go/oauth2"
-       "github.com/apache/pulsar-client-go/oauth2/store"
-
        "github.com/apache/pulsar-client-go/oauth2/clock"
        xoauth2 "golang.org/x/oauth2"
 )
@@ -43,24 +41,24 @@ const (
 )
 
 // tokenCache implements a cache for the token associated with a specific 
audience.
-// it interacts with the store when the access token is near expiration or 
invalidated.
 // it is advisable to use a token cache instance per audience.
 type tokenCache struct {
-       clock     clock.Clock
-       lock      sync.Mutex
-       store     store.Store
-       audience  string
-       refresher oauth2.AuthorizationGrantRefresher
-       token     *xoauth2.Token
+       clock    clock.Clock
+       lock     sync.Mutex
+       audience string
+       token    *xoauth2.Token
+       flow     *oauth2.ClientCredentialsFlow
 }
 
-func NewDefaultTokenCache(store store.Store, audience string,
-       refresher oauth2.AuthorizationGrantRefresher) (CachingTokenSource, 
error) {
+func NewDefaultTokenCache(audience string,
+       flow *oauth2.ClientCredentialsFlow) (CachingTokenSource, error) {
+       if flow == nil {
+               return nil, fmt.Errorf("flow cannot be nil")
+       }
        cache := &tokenCache{
-               clock:     clock.RealClock{},
-               store:     store,
-               audience:  audience,
-               refresher: refresher,
+               clock:    clock.RealClock{},
+               audience: audience,
+               flow:     flow,
        }
        return cache, nil
 }
@@ -77,56 +75,24 @@ func (t *tokenCache) Token() (*xoauth2.Token, error) {
                return t.token, nil
        }
 
-       // load from the store and use the access token if it isn't expired
-       grant, err := t.store.LoadGrant(t.audience)
+       grant, err := t.flow.Authorize(t.audience)
        if err != nil {
-               return nil, fmt.Errorf("LoadGrant: %w", err)
-       }
-       t.token = grant.Token
-       if t.token != nil && t.validateAccessToken(*t.token) {
-               return t.token, nil
+               return nil, err
        }
-
-       // obtain and cache a fresh access token
-       grant, err = t.refresher.Refresh(grant)
-       if err != nil {
-               return nil, fmt.Errorf("RefreshGrant: %w", err)
+       if grant.Token == nil {
+               return nil, fmt.Errorf("authorization succeeded but no token 
was returned")
        }
        t.token = grant.Token
-       err = t.store.SaveGrant(t.audience, *grant)
-       if err != nil {
-               // TODO log rather than throw
-               return nil, fmt.Errorf("SaveGrant: %w", err)
-       }
 
        return t.token, nil
 }
 
-// InvalidateToken clears the access token (likely due to a response from the 
resource server).
-// Note that the token within the grant may contain a refresh token which 
should survive.
+// InvalidateToken clears the cached access token (likely due to a response 
from the resource server).
 func (t *tokenCache) InvalidateToken() error {
        t.lock.Lock()
        defer t.lock.Unlock()
 
-       previous := t.token
        t.token = nil
-
-       // clear from the store the access token that was returned earlier 
(unless the store has since been updated)
-       if previous == nil || previous.AccessToken == "" {
-               return nil
-       }
-       grant, err := t.store.LoadGrant(t.audience)
-       if err != nil {
-               return fmt.Errorf("LoadGrant: %w", err)
-       }
-       if grant.Token != nil && grant.Token.AccessToken == 
previous.AccessToken {
-               grant.Token.Expiry = time.Unix(0, 0).Add(expiryDelta)
-               err = t.store.SaveGrant(t.audience, *grant)
-               if err != nil {
-                       // TODO log rather than throw
-                       return fmt.Errorf("SaveGrant: %w", err)
-               }
-       }
        return nil
 }
 
diff --git a/oauth2/client_credentials_flow.go 
b/oauth2/client_credentials_flow.go
index 9a2643a9..c45dc821 100644
--- a/oauth2/client_credentials_flow.go
+++ b/oauth2/client_credentials_flow.go
@@ -30,11 +30,10 @@ import (
 // ClientCredentialsFlow takes care of the mechanics needed for getting an 
access
 // token using the OAuth 2.0 "Client Credentials Flow"
 type ClientCredentialsFlow struct {
-       options                ClientCredentialsFlowOptions
-       oidcWellKnownEndpoints OIDCWellKnownEndpoints
-       keyfile                *KeyFile
-       exchanger              ClientCredentialsExchanger
-       clock                  clock.Clock
+       options       ClientCredentialsFlowOptions
+       exchanger     ClientCredentialsExchanger
+       grantProvider GrantProvider
+       clock         clock.Clock
 }
 
 // ClientCredentialsProvider abstracts getting client credentials
@@ -47,29 +46,24 @@ type ClientCredentialsExchanger interface {
        ExchangeClientCredentials(req ClientCredentialsExchangeRequest) 
(*TokenResult, error)
 }
 
+// GrantProvider abstracts the creation of authorization grants from 
credentials
+type GrantProvider interface {
+       GetGrant(audience string, options *ClientCredentialsFlowOptions) 
(*AuthorizationGrant, error)
+}
+
 type ClientCredentialsFlowOptions struct {
        KeyFile          string
        AdditionalScopes []string
 }
 
-func newClientCredentialsFlow(
-       options ClientCredentialsFlowOptions,
-       keyfile *KeyFile,
-       oidcWellKnownEndpoints OIDCWellKnownEndpoints,
-       exchanger ClientCredentialsExchanger,
-       clock clock.Clock) *ClientCredentialsFlow {
-       return &ClientCredentialsFlow{
-               options:                options,
-               oidcWellKnownEndpoints: oidcWellKnownEndpoints,
-               keyfile:                keyfile,
-               exchanger:              exchanger,
-               clock:                  clock,
-       }
+// DefaultGrantProvider provides authorization grants by loading credentials 
from a key file
+type DefaultGrantProvider struct {
 }
 
-// NewDefaultClientCredentialsFlow provides an easy way to build up a default
-// client credentials flow with all the correct configuration.
-func NewDefaultClientCredentialsFlow(options ClientCredentialsFlowOptions) 
(*ClientCredentialsFlow, error) {
+// GetGrant creates an authorization grant by loading credentials from the key 
file and
+// merging the scopes from both the options and the key file configuration
+func (p *DefaultGrantProvider) GetGrant(audience string, options 
*ClientCredentialsFlowOptions) (
+       *AuthorizationGrant, error) {
        credsProvider := 
NewClientCredentialsProviderFromKeyFile(options.KeyFile)
        keyFile, err := credsProvider.GetClientCredentials()
        if err != nil {
@@ -80,39 +74,58 @@ func NewDefaultClientCredentialsFlow(options 
ClientCredentialsFlowOptions) (*Cli
        if err != nil {
                return nil, err
        }
+       // Merge the scopes of the options AdditionalScopes with the scopes 
read from the keyFile config
+       var scopesToAdd []string
+       if len(options.AdditionalScopes) > 0 {
+               scopesToAdd = append(scopesToAdd, options.AdditionalScopes...)
+       }
+
+       if keyFile.Scope != "" {
+               scopesSplit := strings.Fields(keyFile.Scope)
+               scopesToAdd = append(scopesToAdd, scopesSplit...)
+       }
+
+       return &AuthorizationGrant{
+               Type:              GrantTypeClientCredentials,
+               Audience:          audience,
+               ClientID:          keyFile.ClientID,
+               ClientCredentials: keyFile,
+               TokenEndpoint:     wellKnownEndpoints.TokenEndpoint,
+               Scopes:            scopesToAdd,
+       }, nil
+}
+
+func newClientCredentialsFlow(
+       options ClientCredentialsFlowOptions,
+       exchanger ClientCredentialsExchanger,
+       grantProvider GrantProvider,
+       clock clock.Clock) *ClientCredentialsFlow {
+       return &ClientCredentialsFlow{
+               options:       options,
+               exchanger:     exchanger,
+               grantProvider: grantProvider,
+               clock:         clock,
+       }
+}
+
+// NewDefaultClientCredentialsFlow provides an easy way to build up a default
+// client credentials flow with all the correct configuration.
+func NewDefaultClientCredentialsFlow(options ClientCredentialsFlowOptions) 
(*ClientCredentialsFlow, error) {
 
        tokenRetriever := NewTokenRetriever(&http.Client{})
        return newClientCredentialsFlow(
                options,
-               keyFile,
-               *wellKnownEndpoints,
                tokenRetriever,
+               &DefaultGrantProvider{},
                clock.RealClock{}), nil
 }
 
 var _ Flow = &ClientCredentialsFlow{}
 
 func (c *ClientCredentialsFlow) Authorize(audience string) 
(*AuthorizationGrant, error) {
-       var err error
-
-       // Merge the scopes of the options AdditionalScopes with the scopes 
read from the keyFile config
-       var scopesToAdd []string
-       if len(c.options.AdditionalScopes) > 0 {
-               scopesToAdd = append(scopesToAdd, c.options.AdditionalScopes...)
-       }
-
-       if c.keyfile.Scope != "" {
-               scopesSplit := strings.Split(c.keyfile.Scope, " ")
-               scopesToAdd = append(scopesToAdd, scopesSplit...)
-       }
-
-       grant := &AuthorizationGrant{
-               Type:              GrantTypeClientCredentials,
-               Audience:          audience,
-               ClientID:          c.keyfile.ClientID,
-               ClientCredentials: c.keyfile,
-               TokenEndpoint:     c.oidcWellKnownEndpoints.TokenEndpoint,
-               Scopes:            scopesToAdd,
+       grant, err := c.grantProvider.GetGrant(audience, &c.options)
+       if err != nil {
+               return nil, err
        }
 
        // test the credentials and obtain an initial access token
diff --git a/oauth2/client_credentials_flow_test.go 
b/oauth2/client_credentials_flow_test.go
index 8fd0a110..97ad8710 100644
--- a/oauth2/client_credentials_flow_test.go
+++ b/oauth2/client_credentials_flow_test.go
@@ -54,6 +54,7 @@ var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
 
                var mockClock clock.Clock
                var mockTokenExchanger *MockTokenExchanger
+               var mockGrantProvider *MockGrantProvider
 
                ginkgo.BeforeEach(func() {
                        mockClock = testing.NewFakeClock(time.Unix(0, 0))
@@ -61,18 +62,18 @@ var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
                        mockTokenExchanger = &MockTokenExchanger{
                                ReturnsTokens: &expectedTokens,
                        }
+                       mockGrantProvider = &MockGrantProvider{
+                               keyFile: &clientCredentials,
+                       }
                })
 
                ginkgo.It("invokes TokenExchanger with credentials", func() {
-                       additionalScope := "additional_scope"
                        provider := newClientCredentialsFlow(
                                ClientCredentialsFlowOptions{
-                                       KeyFile:          "test_keyfile",
-                                       AdditionalScopes: 
[]string{additionalScope},
+                                       KeyFile: "test_keyfile",
                                },
-                               &clientCredentials,
-                               oidcEndpoints,
                                mockTokenExchanger,
+                               mockGrantProvider,
                                mockClock,
                        )
 
@@ -83,7 +84,7 @@ var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
                                ClientID:      clientCredentials.ClientID,
                                ClientSecret:  clientCredentials.ClientSecret,
                                Audience:      "test_audience",
-                               Scopes:        []string{additionalScope, 
clientCredentials.Scope},
+                               Scopes:        
[]string{clientCredentials.Scope},
                        }))
                })
 
@@ -92,9 +93,8 @@ var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
                                ClientCredentialsFlowOptions{
                                        KeyFile: "test_keyfile",
                                },
-                               &clientCredentials,
-                               oidcEndpoints,
                                mockTokenExchanger,
+                               mockGrantProvider,
                                mockClock,
                        )
 
@@ -112,9 +112,8 @@ var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
                                ClientCredentialsFlowOptions{
                                        KeyFile: "test_keyfile",
                                },
-                               &clientCredentials,
-                               oidcEndpoints,
                                mockTokenExchanger,
+                               mockGrantProvider,
                                mockClock,
                        )
 
diff --git a/oauth2/store/keyring.go b/oauth2/store/keyring.go
deleted file mode 100644
index 70fba5b0..00000000
--- a/oauth2/store/keyring.go
+++ /dev/null
@@ -1,195 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package store
-
-import (
-       "crypto/sha1"
-       "encoding/json"
-       "errors"
-       "fmt"
-       "sync"
-
-       "github.com/99designs/keyring"
-       "github.com/apache/pulsar-client-go/oauth2"
-       "github.com/apache/pulsar-client-go/oauth2/clock"
-)
-
-type KeyringStore struct {
-       kr    keyring.Keyring
-       clock clock.Clock
-       lock  sync.Mutex
-}
-
-// storedItem represents an item stored in the keyring
-type storedItem struct {
-       Audience string
-       UserName string
-       Grant    oauth2.AuthorizationGrant
-}
-
-// NewKeyringStore creates a store based on a keyring.
-func NewKeyringStore(kr keyring.Keyring) (*KeyringStore, error) {
-       return &KeyringStore{
-               kr:    kr,
-               clock: clock.RealClock{},
-       }, nil
-}
-
-var _ Store = &KeyringStore{}
-
-func (f *KeyringStore) SaveGrant(audience string, grant 
oauth2.AuthorizationGrant) error {
-       f.lock.Lock()
-       defer f.lock.Unlock()
-
-       var err error
-       var userName string
-       switch grant.Type {
-       case oauth2.GrantTypeClientCredentials:
-               if grant.ClientCredentials == nil {
-                       return ErrUnsupportedAuthData
-               }
-               userName = grant.ClientCredentials.ClientEmail
-       case oauth2.GrantTypeDeviceCode:
-               if grant.Token == nil {
-                       return ErrUnsupportedAuthData
-               }
-               userName, err = oauth2.ExtractUserName(*grant.Token)
-               if err != nil {
-                       return err
-               }
-       default:
-               return ErrUnsupportedAuthData
-       }
-       item := storedItem{
-               Audience: audience,
-               UserName: userName,
-               Grant:    grant,
-       }
-       err = f.setItem(item)
-       if err != nil {
-               return err
-       }
-       return nil
-}
-
-func (f *KeyringStore) LoadGrant(audience string) (*oauth2.AuthorizationGrant, 
error) {
-       f.lock.Lock()
-       defer f.lock.Unlock()
-
-       item, err := f.getItem(audience)
-       if err != nil {
-               if errors.Is(err, keyring.ErrKeyNotFound) {
-                       return nil, ErrNoAuthenticationData
-               }
-               return nil, err
-       }
-       switch item.Grant.Type {
-       case oauth2.GrantTypeClientCredentials:
-               if item.Grant.ClientCredentials == nil {
-                       return nil, ErrUnsupportedAuthData
-               }
-       case oauth2.GrantTypeDeviceCode:
-               if item.Grant.Token == nil {
-                       return nil, ErrUnsupportedAuthData
-               }
-       default:
-               return nil, ErrUnsupportedAuthData
-       }
-       return &item.Grant, nil
-}
-
-func (f *KeyringStore) WhoAmI(audience string) (string, error) {
-       f.lock.Lock()
-       defer f.lock.Unlock()
-
-       key := hashKeyringKey(audience)
-       authItem, err := f.kr.Get(key)
-       if err != nil {
-               if errors.Is(err, keyring.ErrKeyNotFound) {
-                       return "", ErrNoAuthenticationData
-               }
-               return "", fmt.Errorf("unable to get information from the 
keyring: %w", err)
-       }
-       return authItem.Label, nil
-}
-
-func (f *KeyringStore) Logout() error {
-       f.lock.Lock()
-       defer f.lock.Unlock()
-
-       var err error
-       keys, err := f.kr.Keys()
-       if err != nil {
-               return fmt.Errorf("unable to get information from the keyring: 
%w", err)
-       }
-       for _, key := range keys {
-               err = f.kr.Remove(key)
-       }
-       if err != nil {
-               return fmt.Errorf("unable to update the keyring: %w", err)
-       }
-       return nil
-}
-
-func (f *KeyringStore) getItem(audience string) (storedItem, error) {
-       key := hashKeyringKey(audience)
-       i, err := f.kr.Get(key)
-       if err != nil {
-               return storedItem{}, err
-       }
-       var grant oauth2.AuthorizationGrant
-       err = json.Unmarshal(i.Data, &grant)
-       if err != nil {
-               // the grant appears to be invalid
-               return storedItem{}, ErrUnsupportedAuthData
-       }
-       return storedItem{
-               Audience: audience,
-               UserName: i.Label,
-               Grant:    grant,
-       }, nil
-}
-
-func (f *KeyringStore) setItem(item storedItem) error {
-       key := hashKeyringKey(item.Audience)
-       data, err := json.Marshal(item.Grant)
-       if err != nil {
-               return err
-       }
-       i := keyring.Item{
-               Key:                         key,
-               Data:                        data,
-               Label:                       item.UserName,
-               Description:                 "authorization grant",
-               KeychainNotTrustApplication: false,
-               KeychainNotSynchronizable:   false,
-       }
-       err = f.kr.Set(i)
-       if err != nil {
-               return fmt.Errorf("unable to update the keyring: %w", err)
-       }
-       return nil
-}
-
-// hashKeyringKey creates a safe key based on the given string
-func hashKeyringKey(s string) string {
-       h := sha1.New()
-       h.Write([]byte(s))
-       bs := h.Sum(nil)
-       return fmt.Sprintf("%x", bs)
-}
diff --git a/oauth2/store/memory.go b/oauth2/store/memory.go
deleted file mode 100644
index 07c75947..00000000
--- a/oauth2/store/memory.go
+++ /dev/null
@@ -1,87 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package store
-
-import (
-       "sync"
-
-       "github.com/apache/pulsar-client-go/oauth2"
-       "github.com/apache/pulsar-client-go/oauth2/clock"
-)
-
-type MemoryStore struct {
-       clock  clock.Clock
-       lock   sync.Mutex
-       grants map[string]*oauth2.AuthorizationGrant
-}
-
-func NewMemoryStore() Store {
-       return &MemoryStore{
-               clock:  clock.RealClock{},
-               grants: make(map[string]*oauth2.AuthorizationGrant),
-       }
-}
-
-var _ Store = &MemoryStore{}
-
-func (f *MemoryStore) SaveGrant(audience string, grant 
oauth2.AuthorizationGrant) error {
-       f.lock.Lock()
-       defer f.lock.Unlock()
-       f.grants[audience] = &grant
-       return nil
-}
-
-func (f *MemoryStore) LoadGrant(audience string) (*oauth2.AuthorizationGrant, 
error) {
-       f.lock.Lock()
-       defer f.lock.Unlock()
-       grant, ok := f.grants[audience]
-       if !ok {
-               return nil, ErrNoAuthenticationData
-       }
-       return grant, nil
-}
-
-func (f *MemoryStore) WhoAmI(audience string) (string, error) {
-       f.lock.Lock()
-       defer f.lock.Unlock()
-       grant, ok := f.grants[audience]
-       if !ok {
-               return "", ErrNoAuthenticationData
-       }
-       switch grant.Type {
-       case oauth2.GrantTypeClientCredentials:
-               if grant.ClientCredentials == nil {
-                       return "", ErrUnsupportedAuthData
-               }
-               return grant.ClientCredentials.ClientEmail, nil
-       case oauth2.GrantTypeDeviceCode:
-               if grant.Token == nil {
-                       return "", ErrUnsupportedAuthData
-               }
-               return oauth2.ExtractUserName(*grant.Token)
-       default:
-               return "", ErrUnsupportedAuthData
-       }
-}
-
-func (f *MemoryStore) Logout() error {
-       f.lock.Lock()
-       defer f.lock.Unlock()
-       f.grants = map[string]*oauth2.AuthorizationGrant{}
-       return nil
-}
diff --git a/oauth2/store/store.go b/oauth2/store/store.go
deleted file mode 100644
index 5e916920..00000000
--- a/oauth2/store/store.go
+++ /dev/null
@@ -1,45 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package store
-
-import (
-       "errors"
-
-       "github.com/apache/pulsar-client-go/oauth2"
-)
-
-// ErrNoAuthenticationData indicates that stored authentication data is not 
available
-var ErrNoAuthenticationData = errors.New("authentication data is not 
available")
-
-// ErrUnsupportedAuthData indicates that stored authentication data is unusable
-var ErrUnsupportedAuthData = errors.New("authentication data is not usable")
-
-// Store is responsible for persisting authorization grants
-type Store interface {
-       // SaveGrant stores an authorization grant for a given audience
-       SaveGrant(audience string, grant oauth2.AuthorizationGrant) error
-
-       // LoadGrant loads an authorization grant for a given audience
-       LoadGrant(audience string) (*oauth2.AuthorizationGrant, error)
-
-       // WhoAmI returns the current user name (or an error if nobody is 
logged in)
-       WhoAmI(audience string) (string, error)
-
-       // Logout deletes all stored credentials
-       Logout() error
-}
diff --git a/pulsar/auth/oauth2.go b/pulsar/auth/oauth2.go
index e162e4ed..e35a15b2 100644
--- a/pulsar/auth/oauth2.go
+++ b/pulsar/auth/oauth2.go
@@ -28,7 +28,6 @@ import (
        "github.com/apache/pulsar-client-go/oauth2"
        "github.com/apache/pulsar-client-go/oauth2/cache"
        "github.com/apache/pulsar-client-go/oauth2/clock"
-       "github.com/apache/pulsar-client-go/oauth2/store"
 )
 
 const (
@@ -44,10 +43,10 @@ const (
 type oauth2AuthProvider struct {
        clock            clock.Clock
        issuer           oauth2.Issuer
-       store            store.Store
        source           cache.CachingTokenSource
        defaultTransport http.RoundTripper
        tokenTransport   *transport
+       flow             *oauth2.ClientCredentialsFlow
 }
 
 // NewAuthenticationOAuth2WithParams return a interface of Provider with 
string map.
@@ -58,8 +57,6 @@ func NewAuthenticationOAuth2WithParams(params 
map[string]string) (Provider, erro
                Audience:       params[ConfigParamAudience],
        }
 
-       // initialize a store of authorization grants
-       st := store.NewMemoryStore()
        switch params[ConfigParamType] {
        case ConfigParamTypeClientCredentials:
                flow, err := 
oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{
@@ -69,46 +66,25 @@ func NewAuthenticationOAuth2WithParams(params 
map[string]string) (Provider, erro
                if err != nil {
                        return nil, err
                }
-               grant, err := flow.Authorize(issuer.Audience)
-               if err != nil {
-                       return nil, err
-               }
-               err = st.SaveGrant(issuer.Audience, *grant)
-               if err != nil {
-                       return nil, err
-               }
+               return NewAuthenticationOAuth2(issuer, flow), nil
        default:
                return nil, fmt.Errorf("unsupported authentication type: %s", 
params[ConfigParamType])
        }
-
-       return NewAuthenticationOAuth2(issuer, st), nil
 }
 
 func NewAuthenticationOAuth2(
        issuer oauth2.Issuer,
-       store store.Store) Provider {
+       flow *oauth2.ClientCredentialsFlow) Provider {
 
        return &oauth2AuthProvider{
                clock:  clock.RealClock{},
                issuer: issuer,
-               store:  store,
+               flow:   flow,
        }
 }
 
 func (p *oauth2AuthProvider) Init() error {
-       grant, err := p.store.LoadGrant(p.issuer.Audience)
-       if err != nil {
-               if err == store.ErrNoAuthenticationData {
-                       return nil
-               }
-               return err
-       }
-       refresher, err := p.getRefresher(grant.Type)
-       if err != nil {
-               return err
-       }
-
-       source, err := cache.NewDefaultTokenCache(p.store, p.issuer.Audience, 
refresher)
+       source, err := cache.NewDefaultTokenCache(p.issuer.Audience, p.flow)
        if err != nil {
                return err
        }
@@ -140,17 +116,6 @@ func (p *oauth2AuthProvider) Close() error {
        return nil
 }
 
-func (p *oauth2AuthProvider) getRefresher(t oauth2.AuthorizationGrantType) 
(oauth2.AuthorizationGrantRefresher, error) {
-       switch t {
-       case oauth2.GrantTypeClientCredentials:
-               return oauth2.NewDefaultClientCredentialsGrantRefresher(p.clock)
-       case oauth2.GrantTypeDeviceCode:
-               return 
oauth2.NewDefaultDeviceAuthorizationGrantRefresher(p.clock)
-       default:
-               return nil, store.ErrUnsupportedAuthData
-       }
-}
-
 type transport struct {
        source  cache.CachingTokenSource
        wrapped *xoauth2.Transport
diff --git a/pulsar/auth/oauth2_test.go b/pulsar/auth/oauth2_test.go
index cc3d11c1..ecaad51f 100644
--- a/pulsar/auth/oauth2_test.go
+++ b/pulsar/auth/oauth2_test.go
@@ -22,12 +22,18 @@ import (
        "net/http"
        "net/http/httptest"
        "os"
+       "sync/atomic"
        "testing"
 
+       "github.com/apache/pulsar-client-go/oauth2"
        "github.com/pkg/errors"
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
 )
 
+var expectedClientID atomic.Value
+var expectedClientSecret atomic.Value
+
 // mockOAuthServer will mock a oauth service for the tests
 func mockOAuthServer() *httptest.Server {
        // prepare a port for the mocked server
@@ -44,7 +50,17 @@ func mockOAuthServer() *httptest.Server {
 }`, server.URL, server.URL, server.URL, server.URL)
                fmt.Fprintln(writer, s)
        })
-       mockedHandler.HandleFunc("/oauth/token", func(writer 
http.ResponseWriter, _ *http.Request) {
+       mockedHandler.HandleFunc("/oauth/token", func(writer 
http.ResponseWriter, r *http.Request) {
+               if err := r.ParseForm(); err != nil {
+                       http.Error(writer, "invalid form", 
http.StatusBadRequest)
+                       return
+               }
+               clientID := r.FormValue("client_id")
+               clientSecret := r.FormValue("client_secret")
+               if clientID != expectedClientID.Load().(string) || clientSecret 
!= expectedClientSecret.Load().(string) {
+                       http.Error(writer, "invalid client credentials", 
http.StatusUnauthorized)
+                       return
+               }
                fmt.Fprintln(writer, "{\n  \"access_token\": 
\"token-content\",\n  \"token_type\": \"Bearer\"\n}")
        })
        mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, 
_ *http.Request) {
@@ -72,7 +88,8 @@ func mockKeyFile(server string) (string, error) {
   "client_id":"client-id",
   "client_secret":"client-secret",
   "client_email":"[email protected]",
-  "issuer_url":"%s"
+  "issuer_url":"%s",
+  "scope": "test-scope"
 }`, server))
        if err != nil {
                return "", err
@@ -84,6 +101,8 @@ func mockKeyFile(server string) (string, error) {
 func TestNewAuthenticationOAuth2WithParams(t *testing.T) {
        server := mockOAuthServer()
        defer server.Close()
+       expectedClientID.Store("client-id")
+       expectedClientSecret.Store("client-secret")
        kf, err := mockKeyFile(server.URL)
        defer os.Remove(kf)
        if err != nil {
@@ -142,3 +161,73 @@ func TestNewAuthenticationOAuth2WithParams(t *testing.T) {
                assert.Equal(t, "token-content", string(token))
        }
 }
+
+func TestOAuth2KeyFileReloading(t *testing.T) {
+       server := mockOAuthServer()
+       defer server.Close()
+       expectedClientID.Store("client-id")
+       expectedClientSecret.Store("client-secret")
+       kf, err := mockKeyFile(server.URL)
+       defer os.Remove(kf)
+       require.NoError(t, err)
+
+       params := map[string]string{
+               ConfigParamType:      ConfigParamTypeClientCredentials,
+               ConfigParamIssuerURL: server.URL,
+               ConfigParamClientID:  "client-id",
+               ConfigParamAudience:  "audience",
+               ConfigParamKeyFile:   fmt.Sprintf("file://%s", kf),
+               ConfigParamScope:     "profile",
+       }
+
+       auth, err := NewAuthenticationOAuth2WithParams(params)
+       require.NoError(t, err)
+       err = auth.Init()
+       require.NoError(t, err)
+
+       token, err := auth.GetData()
+       require.NoError(t, err)
+       assert.Equal(t, "token-content", string(token))
+
+       expectedClientSecret.Store("new-client-secret")
+       _, err = auth.GetData()
+       require.Error(t, err) // The token refresh should be failed after 
updating the client-secret
+
+       // now update the key file to have different client credentials
+       keyFile, err := os.OpenFile(kf, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 
0644)
+       require.NoError(t, err)
+       _, err = keyFile.WriteString(fmt.Sprintf(`{
+  "type":"resource",
+  "client_id":"client-id",
+  "client_secret":"new-client-secret",
+  "client_email":"[email protected]",
+  "issuer_url":"%s"
+}`, server.URL))
+       require.NoError(t, err)
+       require.NoError(t, keyFile.Close())
+
+       token, err = auth.GetData()
+       if err != nil {
+               t.Fatal(err)
+       }
+       assert.Equal(t, "token-content", string(token))
+}
+
+func TestGrantProviderScopes(t *testing.T) {
+       expectedClientID.Store("client-id")
+       expectedClientSecret.Store("client-secret")
+       server := mockOAuthServer()
+       defer server.Close()
+       kf, err := mockKeyFile(server.URL)
+       defer os.Remove(kf)
+       require.NoError(t, err)
+
+       grantProvider := oauth2.DefaultGrantProvider{}
+       grant, err := grantProvider.GetGrant("test-audience", 
&oauth2.ClientCredentialsFlowOptions{
+               KeyFile:          kf,
+               AdditionalScopes: []string{"scope1", "scope2"},
+       })
+       require.NoError(t, err)
+
+       assert.Equal(t, []string{"scope1", "scope2", "test-scope"}, 
grant.Scopes)
+}
diff --git a/pulsaradmin/pkg/admin/auth/oauth2.go 
b/pulsaradmin/pkg/admin/auth/oauth2.go
index 59587abf..9536f09a 100644
--- a/pulsaradmin/pkg/admin/auth/oauth2.go
+++ b/pulsaradmin/pkg/admin/auth/oauth2.go
@@ -19,18 +19,11 @@ package auth
 
 import (
        "encoding/json"
-       "fmt"
        "net/http"
-       "path/filepath"
 
-       "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
-
-       "github.com/99designs/keyring"
        "github.com/apache/pulsar-client-go/oauth2"
        "github.com/apache/pulsar-client-go/oauth2/cache"
        clock2 "github.com/apache/pulsar-client-go/oauth2/clock"
-       "github.com/apache/pulsar-client-go/oauth2/store"
-       "github.com/pkg/errors"
        xoauth2 "golang.org/x/oauth2"
 )
 
@@ -50,25 +43,10 @@ type OAuth2ClientCredentials struct {
 type OAuth2Provider struct {
        clock            clock2.RealClock
        issuer           oauth2.Issuer
-       store            store.Store
        source           cache.CachingTokenSource
        defaultTransport http.RoundTripper
        tokenTransport   *transport
-}
-
-func NewAuthenticationOAuth2(issuer oauth2.Issuer, store store.Store) 
(*OAuth2Provider, error) {
-       p := &OAuth2Provider{
-               clock:  clock2.RealClock{},
-               issuer: issuer,
-               store:  store,
-       }
-
-       err := p.loadGrant()
-       if err != nil {
-               return nil, err
-       }
-
-       return p, nil
+       flow             *oauth2.ClientCredentialsFlow
 }
 
 // NewAuthenticationOAuth2WithDefaultFlow uses memory to save the grant
@@ -80,29 +58,18 @@ func NewAuthenticationOAuth2WithDefaultFlow(issuer 
oauth2.Issuer, keyFile string
 
 func NewAuthenticationOAuth2WithFlow(
        issuer oauth2.Issuer, flowOptions oauth2.ClientCredentialsFlowOptions) 
(Provider, error) {
-       st := store.NewMemoryStore()
        flow, err := oauth2.NewDefaultClientCredentialsFlow(flowOptions)
        if err != nil {
                return nil, err
        }
 
-       grant, err := flow.Authorize(issuer.Audience)
-       if err != nil {
-               return nil, err
-       }
-
-       err = st.SaveGrant(issuer.Audience, *grant)
-       if err != nil {
-               return nil, err
-       }
-
        p := &OAuth2Provider{
                clock:  clock2.RealClock{},
                issuer: issuer,
-               store:  st,
+               flow:   flow,
        }
 
-       return p, p.loadGrant()
+       return p, p.initCache()
 }
 
 func NewAuthenticationOAuth2FromAuthParams(encodedAuthParam string,
@@ -114,14 +81,14 @@ func 
NewAuthenticationOAuth2FromAuthParams(encodedAuthParam string,
                return nil, err
        }
        return NewAuthenticationOAuth2WithParams(paramsJSON.IssuerURL, 
paramsJSON.ClientID, paramsJSON.Audience,
-               paramsJSON.Scope, transport)
+               paramsJSON.PrivateKey, transport)
 }
 
 func NewAuthenticationOAuth2WithParams(
        issuerEndpoint,
        clientID,
        audience string,
-       _ string,
+       privateKey string,
        transport http.RoundTripper) (*OAuth2Provider, error) {
 
        issuer := oauth2.Issuer{
@@ -130,7 +97,7 @@ func NewAuthenticationOAuth2WithParams(
                Audience:       audience,
        }
 
-       keyringStore, err := MakeKeyringStore()
+       flow, err := 
oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{KeyFile:
 privateKey})
        if err != nil {
                return nil, err
        }
@@ -138,11 +105,11 @@ func NewAuthenticationOAuth2WithParams(
        p := &OAuth2Provider{
                clock:            clock2.RealClock{},
                issuer:           issuer,
-               store:            keyringStore,
                defaultTransport: transport,
+               flow:             flow,
        }
 
-       err = p.loadGrant()
+       err = p.initCache()
        if err != nil {
                return nil, err
        }
@@ -150,24 +117,8 @@ func NewAuthenticationOAuth2WithParams(
        return p, nil
 }
 
-func (o *OAuth2Provider) loadGrant() error {
-       grant, err := o.store.LoadGrant(o.issuer.Audience)
-       if err != nil {
-               if err == store.ErrNoAuthenticationData {
-                       return errors.New("oauth2 login required")
-               }
-               return err
-       }
-       return o.initCache(grant)
-}
-
-func (o *OAuth2Provider) initCache(grant *oauth2.AuthorizationGrant) error {
-       refresher, err := o.getRefresher(grant.Type)
-       if err != nil {
-               return err
-       }
-
-       source, err := cache.NewDefaultTokenCache(o.store, o.issuer.Audience, 
refresher)
+func (o *OAuth2Provider) initCache() error {
+       source, err := cache.NewDefaultTokenCache(o.issuer.Audience, o.flow)
        if err != nil {
                return err
        }
@@ -194,17 +145,6 @@ func (o *OAuth2Provider) Transport() http.RoundTripper {
        return o.tokenTransport
 }
 
-func (o *OAuth2Provider) getRefresher(t oauth2.AuthorizationGrantType) 
(oauth2.AuthorizationGrantRefresher, error) {
-       switch t {
-       case oauth2.GrantTypeClientCredentials:
-               return oauth2.NewDefaultClientCredentialsGrantRefresher(o.clock)
-       case oauth2.GrantTypeDeviceCode:
-               return 
oauth2.NewDefaultDeviceAuthorizationGrantRefresher(o.clock)
-       default:
-               return nil, store.ErrUnsupportedAuthData
-       }
-}
-
 type transport struct {
        source  cache.CachingTokenSource
        wrapped *xoauth2.Transport
@@ -231,32 +171,3 @@ func (t *transport) RoundTrip(req *http.Request) 
(*http.Response, error) {
 }
 
 func (t *transport) WrappedRoundTripper() http.RoundTripper { return 
t.wrapped.Base }
-
-const (
-       serviceName  = "pulsar"
-       keyChainName = "pulsarctl"
-)
-
-func MakeKeyringStore() (store.Store, error) {
-       kr, err := makeKeyring()
-       if err != nil {
-               return nil, err
-       }
-       return store.NewKeyringStore(kr)
-}
-
-func makeKeyring() (keyring.Keyring, error) {
-       return keyring.Open(keyring.Config{
-               AllowedBackends:          keyring.AvailableBackends(),
-               ServiceName:              serviceName,
-               KeychainName:             keyChainName,
-               KeychainTrustApplication: true,
-               FileDir: filepath.Join(fmt.Sprintf(
-                       "%s/.config/pulsar", utils.GetConfigPath()), 
"credentials"),
-               FilePasswordFunc: keyringPrompt,
-       })
-}
-
-func keyringPrompt(_ string) (string, error) {
-       return "", nil
-}
diff --git a/pulsaradmin/pkg/admin/auth/oauth2_test.go 
b/pulsaradmin/pkg/admin/auth/oauth2_test.go
index b19133c7..65704ab8 100644
--- a/pulsaradmin/pkg/admin/auth/oauth2_test.go
+++ b/pulsaradmin/pkg/admin/auth/oauth2_test.go
@@ -25,7 +25,6 @@ import (
        "testing"
 
        "github.com/apache/pulsar-client-go/oauth2"
-       "github.com/apache/pulsar-client-go/oauth2/store"
        "github.com/pkg/errors"
        "github.com/stretchr/testify/assert"
 )
@@ -99,37 +98,20 @@ func TestOauth2(t *testing.T) {
                Audience:       server.URL,
        }
 
-       memoryStore := store.NewMemoryStore()
-       err = saveGrant(memoryStore, kf, issuer.Audience)
+       auth, err := NewAuthenticationOAuth2WithFlow(issuer, 
oauth2.ClientCredentialsFlowOptions{
+               KeyFile: kf,
+       })
        if err != nil {
                t.Fatal(err)
        }
 
-       auth, err := NewAuthenticationOAuth2(issuer, memoryStore)
-       if err != nil {
-               t.Fatal(err)
+       provider, ok := auth.(*OAuth2Provider)
+       if !ok {
+               t.Fatal("unexpected provider type")
        }
-
-       token, err := auth.source.Token()
+       token, err := provider.source.Token()
        if err != nil {
                t.Fatal(err)
        }
        assert.Equal(t, "token-content", token.AccessToken)
 }
-
-func saveGrant(store store.Store, keyFile, audience string) error {
-       flow, err := 
oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{
-               KeyFile:          keyFile,
-               AdditionalScopes: nil,
-       })
-       if err != nil {
-               return err
-       }
-
-       grant, err := flow.Authorize(audience)
-       if err != nil {
-               return err
-       }
-
-       return store.SaveGrant(audience, *grant)
-}


Reply via email to