This is an automated email from the ASF dual-hosted git repository.

rfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 203dcf1c feat(oauth2): add support for issuer URL override in client 
credentials flow (#1463)
203dcf1c is described below

commit 203dcf1cd22dbcf853c2a06c52dee3c4e1eba92b
Author: Rui Fu <[email protected]>
AuthorDate: Thu Jan 29 22:00:28 2026 +0800

    feat(oauth2): add support for issuer URL override in client credentials 
flow (#1463)
    
    * feat(oauth2): add support for issuer URL override in client credentials 
flow
    
    * fix(oauth2): handle nil options in client credentials flow
    
    * test(oauth2): add tests for DefaultGrantProvider behavior
---
 oauth2/client_credentials_flow.go         | 14 ++++-
 oauth2/client_credentials_flow_test.go    | 91 +++++++++++++++++++++++++++++++
 pulsar/auth/oauth2.go                     |  1 +
 pulsar/auth/oauth2_test.go                | 84 +++++++++++++++++++++++++++-
 pulsaradmin/pkg/admin/auth/oauth2.go      |  8 ++-
 pulsaradmin/pkg/admin/auth/oauth2_test.go | 32 ++++++++++-
 6 files changed, 225 insertions(+), 5 deletions(-)

diff --git a/oauth2/client_credentials_flow.go 
b/oauth2/client_credentials_flow.go
index c45dc821..87cebc6b 100644
--- a/oauth2/client_credentials_flow.go
+++ b/oauth2/client_credentials_flow.go
@@ -53,6 +53,7 @@ type GrantProvider interface {
 
 type ClientCredentialsFlowOptions struct {
        KeyFile          string
+       IssuerURL        string
        AdditionalScopes []string
 }
 
@@ -64,13 +65,24 @@ type DefaultGrantProvider struct {
 // merging the scopes from both the options and the key file configuration
 func (p *DefaultGrantProvider) GetGrant(audience string, options 
*ClientCredentialsFlowOptions) (
        *AuthorizationGrant, error) {
+       if options == nil {
+               return nil, errors.New("client credentials flow options cannot 
be nil")
+       }
        credsProvider := 
NewClientCredentialsProviderFromKeyFile(options.KeyFile)
        keyFile, err := credsProvider.GetClientCredentials()
        if err != nil {
                return nil, errors.Wrap(err, "could not get client credentials")
        }
 
-       wellKnownEndpoints, err := 
GetOIDCWellKnownEndpointsFromIssuerURL(keyFile.IssuerURL)
+       issuerURL := options.IssuerURL
+       if issuerURL == "" {
+               issuerURL = keyFile.IssuerURL
+       }
+       if issuerURL == "" {
+               return nil, errors.New("issuer url is required for client 
credentials flow")
+       }
+
+       wellKnownEndpoints, err := 
GetOIDCWellKnownEndpointsFromIssuerURL(issuerURL)
        if err != nil {
                return nil, err
        }
diff --git a/oauth2/client_credentials_flow_test.go 
b/oauth2/client_credentials_flow_test.go
index 97ad8710..d1a1c592 100644
--- a/oauth2/client_credentials_flow_test.go
+++ b/oauth2/client_credentials_flow_test.go
@@ -19,6 +19,10 @@ package oauth2
 
 import (
        "errors"
+       "fmt"
+       "net/http"
+       "net/http/httptest"
+       "os"
        "time"
 
        "github.com/apache/pulsar-client-go/oauth2/clock"
@@ -49,6 +53,57 @@ var clientCredentials = KeyFile{
        Scope:        "test_scope",
 }
 
+func mockWellKnownServer(tokenEndpoint string) *httptest.Server {
+       handler := http.NewServeMux()
+       handler.HandleFunc("/.well-known/openid-configuration", func(writer 
http.ResponseWriter, _ *http.Request) {
+               fmt.Fprintf(writer, "{\n  \"token_endpoint\": \"%s\"\n}\n", 
tokenEndpoint)
+       })
+       return httptest.NewServer(handler)
+}
+
+func mockKeyFileWithIssuer(issuerURL string) (string, error) {
+       kf, err := os.CreateTemp("", "test_oauth2")
+       if err != nil {
+               return "", err
+       }
+       _, err = kf.WriteString(fmt.Sprintf(`{
+  "type":"resource",
+  "client_id":"client-id",
+  "client_secret":"client-secret",
+  "client_email":"[email protected]",
+  "issuer_url":"%s"
+}`, issuerURL))
+       if err != nil {
+               _ = kf.Close()
+               return "", err
+       }
+       if err := kf.Close(); err != nil {
+               return "", err
+       }
+       return kf.Name(), nil
+}
+
+func mockKeyFileWithoutIssuer() (string, error) {
+       kf, err := os.CreateTemp("", "test_oauth2")
+       if err != nil {
+               return "", err
+       }
+       _, err = kf.WriteString(`{
+  "type":"resource",
+  "client_id":"client-id",
+  "client_secret":"client-secret",
+  "client_email":"[email protected]"
+}`)
+       if err != nil {
+               _ = kf.Close()
+               return "", err
+       }
+       if err := kf.Close(); err != nil {
+               return "", err
+       }
+       return kf.Name(), nil
+}
+
 var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
        ginkgo.Describe("Authorize", func() {
 
@@ -124,6 +179,42 @@ var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
        })
 })
 
+var _ = ginkgo.Describe("DefaultGrantProvider", func() {
+       ginkgo.It("prefers issuer url from options over key file", func() {
+               keyFileTokenEndpoint := "http://keyfile.example/token";
+               optionsTokenEndpoint := "http://options.example/token";
+               serverFromKeyFile := mockWellKnownServer(keyFileTokenEndpoint)
+               defer serverFromKeyFile.Close()
+               serverFromOptions := mockWellKnownServer(optionsTokenEndpoint)
+               defer serverFromOptions.Close()
+
+               keyFile, err := mockKeyFileWithIssuer(serverFromKeyFile.URL)
+               gomega.Expect(err).ToNot(gomega.HaveOccurred())
+               defer os.Remove(keyFile)
+
+               provider := DefaultGrantProvider{}
+               grant, err := provider.GetGrant("test-audience", 
&ClientCredentialsFlowOptions{
+                       KeyFile:   keyFile,
+                       IssuerURL: serverFromOptions.URL,
+               })
+               gomega.Expect(err).ToNot(gomega.HaveOccurred())
+               
gomega.Expect(grant.TokenEndpoint).To(gomega.Equal(optionsTokenEndpoint))
+       })
+
+       ginkgo.It("returns an error when issuer url is missing", func() {
+               keyFile, err := mockKeyFileWithoutIssuer()
+               gomega.Expect(err).ToNot(gomega.HaveOccurred())
+               defer os.Remove(keyFile)
+
+               provider := DefaultGrantProvider{}
+               _, err = provider.GetGrant("test-audience", 
&ClientCredentialsFlowOptions{
+                       KeyFile: keyFile,
+               })
+               gomega.Expect(err).To(gomega.HaveOccurred())
+               gomega.Expect(err.Error()).To(gomega.Equal("issuer url is 
required for client credentials flow"))
+       })
+})
+
 var _ = ginkgo.Describe("ClientCredentialsGrantRefresher", func() {
 
        ginkgo.Describe("Refresh", func() {
diff --git a/pulsar/auth/oauth2.go b/pulsar/auth/oauth2.go
index e35a15b2..42edd178 100644
--- a/pulsar/auth/oauth2.go
+++ b/pulsar/auth/oauth2.go
@@ -61,6 +61,7 @@ func NewAuthenticationOAuth2WithParams(params 
map[string]string) (Provider, erro
        case ConfigParamTypeClientCredentials:
                flow, err := 
oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{
                        KeyFile:          params[ConfigParamKeyFile],
+                       IssuerURL:        params[ConfigParamIssuerURL],
                        AdditionalScopes: 
strings.Split(params[ConfigParamScope], " "),
                })
                if err != nil {
diff --git a/pulsar/auth/oauth2_test.go b/pulsar/auth/oauth2_test.go
index ecaad51f..7dbb479d 100644
--- a/pulsar/auth/oauth2_test.go
+++ b/pulsar/auth/oauth2_test.go
@@ -36,6 +36,11 @@ var expectedClientSecret atomic.Value
 
 // mockOAuthServer will mock a oauth service for the tests
 func mockOAuthServer() *httptest.Server {
+       return mockOAuthServerWithToken("token-content")
+}
+
+// mockOAuthServerWithToken will mock a oauth service for the tests with a 
custom token.
+func mockOAuthServerWithToken(token string) *httptest.Server {
        // prepare a port for the mocked server
        server := httptest.NewUnstartedServer(http.DefaultServeMux)
 
@@ -61,7 +66,7 @@ func mockOAuthServer() *httptest.Server {
                        http.Error(writer, "invalid client credentials", 
http.StatusUnauthorized)
                        return
                }
-               fmt.Fprintln(writer, "{\n  \"access_token\": 
\"token-content\",\n  \"token_type\": \"Bearer\"\n}")
+               fmt.Fprintf(writer, "{\n  \"access_token\": \"%s\",\n  
\"token_type\": \"Bearer\"\n}\n", token)
        })
        mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, 
_ *http.Request) {
                fmt.Fprintln(writer, "true")
@@ -98,6 +103,29 @@ func mockKeyFile(server string) (string, error) {
        return kf.Name(), nil
 }
 
+func mockKeyFileWithoutIssuer() (string, error) {
+       pwd, err := os.Getwd()
+       if err != nil {
+               return "", err
+       }
+       kf, err := os.CreateTemp(pwd, "test_oauth2")
+       if err != nil {
+               return "", err
+       }
+       _, err = kf.WriteString(`{
+  "type":"resource",
+  "client_id":"client-id",
+  "client_secret":"client-secret",
+  "client_email":"[email protected]",
+  "scope": "test-scope"
+}`)
+       if err != nil {
+               return "", err
+       }
+
+       return kf.Name(), nil
+}
+
 func TestNewAuthenticationOAuth2WithParams(t *testing.T) {
        server := mockOAuthServer()
        defer server.Close()
@@ -162,6 +190,60 @@ func TestNewAuthenticationOAuth2WithParams(t *testing.T) {
        }
 }
 
+func TestOAuth2IssuerOverrideUsesAuthParams(t *testing.T) {
+       expectedClientID.Store("client-id")
+       expectedClientSecret.Store("client-secret")
+       serverFromKeyFile := mockOAuthServerWithToken("token-from-keyfile")
+       defer serverFromKeyFile.Close()
+       serverFromParams := mockOAuthServerWithToken("token-from-params")
+       defer serverFromParams.Close()
+
+       kf, err := mockKeyFile(serverFromKeyFile.URL)
+       defer os.Remove(kf)
+       require.NoError(t, err)
+
+       params := map[string]string{
+               ConfigParamType:      ConfigParamTypeClientCredentials,
+               ConfigParamIssuerURL: serverFromParams.URL,
+               ConfigParamClientID:  "client-id",
+               ConfigParamAudience:  "audience",
+               ConfigParamKeyFile:   kf,
+               ConfigParamScope:     "profile",
+       }
+
+       auth, err := NewAuthenticationOAuth2WithParams(params)
+       require.NoError(t, err)
+       require.NoError(t, auth.Init())
+
+       token, err := auth.GetData()
+       require.NoError(t, err)
+       assert.Equal(t, "token-from-params", string(token))
+}
+
+func TestOAuth2MissingIssuerReturnsError(t *testing.T) {
+       expectedClientID.Store("client-id")
+       expectedClientSecret.Store("client-secret")
+       kf, err := mockKeyFileWithoutIssuer()
+       defer os.Remove(kf)
+       require.NoError(t, err)
+
+       params := map[string]string{
+               ConfigParamType:     ConfigParamTypeClientCredentials,
+               ConfigParamClientID: "client-id",
+               ConfigParamAudience: "audience",
+               ConfigParamKeyFile:  kf,
+               ConfigParamScope:    "profile",
+       }
+
+       auth, err := NewAuthenticationOAuth2WithParams(params)
+       require.NoError(t, err)
+       require.NoError(t, auth.Init())
+
+       _, err = auth.GetData()
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "issuer url is required for client 
credentials flow")
+}
+
 func TestOAuth2KeyFileReloading(t *testing.T) {
        server := mockOAuthServer()
        defer server.Close()
diff --git a/pulsaradmin/pkg/admin/auth/oauth2.go 
b/pulsaradmin/pkg/admin/auth/oauth2.go
index 9536f09a..f310f135 100644
--- a/pulsaradmin/pkg/admin/auth/oauth2.go
+++ b/pulsaradmin/pkg/admin/auth/oauth2.go
@@ -52,7 +52,8 @@ type OAuth2Provider struct {
 // NewAuthenticationOAuth2WithDefaultFlow uses memory to save the grant
 func NewAuthenticationOAuth2WithDefaultFlow(issuer oauth2.Issuer, keyFile 
string) (Provider, error) {
        return NewAuthenticationOAuth2WithFlow(issuer, 
oauth2.ClientCredentialsFlowOptions{
-               KeyFile: keyFile,
+               KeyFile:   keyFile,
+               IssuerURL: issuer.IssuerEndpoint,
        })
 }
 
@@ -97,7 +98,10 @@ func NewAuthenticationOAuth2WithParams(
                Audience:       audience,
        }
 
-       flow, err := 
oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{KeyFile:
 privateKey})
+       flow, err := 
oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{
+               KeyFile:   privateKey,
+               IssuerURL: issuerEndpoint,
+       })
        if err != nil {
                return nil, err
        }
diff --git a/pulsaradmin/pkg/admin/auth/oauth2_test.go 
b/pulsaradmin/pkg/admin/auth/oauth2_test.go
index 65704ab8..3f3ea32f 100644
--- a/pulsaradmin/pkg/admin/auth/oauth2_test.go
+++ b/pulsaradmin/pkg/admin/auth/oauth2_test.go
@@ -27,10 +27,16 @@ import (
        "github.com/apache/pulsar-client-go/oauth2"
        "github.com/pkg/errors"
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
 )
 
 // mockOAuthServer will mock a oauth service for the tests
 func mockOAuthServer() *httptest.Server {
+       return mockOAuthServerWithToken("token-content")
+}
+
+// mockOAuthServerWithToken will mock a oauth service for the tests with a 
custom token.
+func mockOAuthServerWithToken(token string) *httptest.Server {
        // prepare a port for the mocked server
        server := httptest.NewUnstartedServer(http.DefaultServeMux)
 
@@ -46,7 +52,7 @@ func mockOAuthServer() *httptest.Server {
                fmt.Fprintln(writer, s)
        })
        mockedHandler.HandleFunc("/oauth/token", func(writer 
http.ResponseWriter, _ *http.Request) {
-               fmt.Fprintln(writer, "{\n  \"access_token\": 
\"token-content\",\n  \"token_type\": \"Bearer\"\n}")
+               fmt.Fprintf(writer, "{\n  \"access_token\": \"%s\",\n  
\"token_type\": \"Bearer\"\n}\n", token)
        })
        mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, 
_ *http.Request) {
                fmt.Fprintln(writer, "true")
@@ -115,3 +121,27 @@ func TestOauth2(t *testing.T) {
        }
        assert.Equal(t, "token-content", token.AccessToken)
 }
+
+func TestOAuth2IssuerOverrideUsesAuthParams(t *testing.T) {
+       serverFromKeyFile := mockOAuthServerWithToken("token-from-keyfile")
+       defer serverFromKeyFile.Close()
+       serverFromParams := mockOAuthServerWithToken("token-from-params")
+       defer serverFromParams.Close()
+
+       kf, err := mockKeyFile(serverFromKeyFile.URL)
+       defer os.Remove(kf)
+       require.NoError(t, err)
+
+       provider, err := NewAuthenticationOAuth2WithParams(
+               serverFromParams.URL,
+               "client-id",
+               serverFromParams.URL,
+               kf,
+               http.DefaultTransport,
+       )
+       require.NoError(t, err)
+
+       token, err := provider.source.Token()
+       require.NoError(t, err)
+       assert.Equal(t, "token-from-params", token.AccessToken)
+}

Reply via email to