This is an automated email from the ASF dual-hosted git repository.

nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 05c1e32c Make ZTS proxy configurable in athenz auth plugin (#1360)
05c1e32c is described below

commit 05c1e32c0abe23686c155d06f233272b317bd711
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Fri May 9 12:26:22 2025 +0900

    Make ZTS proxy configurable in athenz auth plugin (#1360)
---
 pulsar/auth/athenz.go      |  7 +++-
 pulsar/auth/athenz_test.go | 98 ++++++++++++++++++++++++++++++++++++++++++----
 2 files changed, 96 insertions(+), 9 deletions(-)

diff --git a/pulsar/auth/athenz.go b/pulsar/auth/athenz.go
index 0b9be971..e259a655 100644
--- a/pulsar/auth/athenz.go
+++ b/pulsar/auth/athenz.go
@@ -50,6 +50,7 @@ type athenzAuthProvider struct {
        principalHeader         string
        roleHeader              string
        ztsURL                  string
+       ztsProxyURL             string
        tokenBuilder            zms.TokenBuilder
        roleToken               zts.RoleToken
        zmsNewTokenBuilder      func(domain, name string, privateKeyPEM []byte, 
keyVersion string) (zms.TokenBuilder, error)
@@ -78,6 +79,7 @@ func NewAuthenticationAthenzWithParams(params 
map[string]string) (Provider, erro
                params["principalHeader"],
                params["roleHeader"],
                params["ztsUrl"],
+               params["ztsProxyUrl"],
        ), nil
 }
 
@@ -91,7 +93,8 @@ func NewAuthenticationAthenz(
        caCert string,
        principalHeader string,
        roleHeader string,
-       ztsURL string) Provider {
+       ztsURL string,
+       ztsProxyURL string) Provider {
        fixedKeyID := defaultKeyID
        if keyID != "" {
                fixedKeyID = keyID
@@ -121,6 +124,7 @@ func NewAuthenticationAthenz(
                principalHeader:         principalHeader,
                roleHeader:              fixedRoleHeader,
                ztsURL:                  strings.TrimSuffix(ztsURL, "/"),
+               ztsProxyURL:             ztsProxyURL,
                zmsNewTokenBuilder:      zms.NewTokenBuilder,
                ztsNewRoleToken:         ztsNewRoleToken,
                ztsNewRoleTokenFromCert: ztsNewRoleTokenFromCert,
@@ -135,6 +139,7 @@ func (p *athenzAuthProvider) Init() error {
        var roleToken zts.RoleToken
        opts := zts.RoleTokenOptions{
                BaseZTSURL:       p.ztsURL + "/zts/v1",
+               ProxyURL:         p.ztsProxyURL,
                MinExpire:        minExpire,
                MaxExpire:        maxExpire,
                PrefetchInterval: prefetchInterval,
diff --git a/pulsar/auth/athenz_test.go b/pulsar/auth/athenz_test.go
index 4e939151..84d90d26 100644
--- a/pulsar/auth/athenz_test.go
+++ b/pulsar/auth/athenz_test.go
@@ -118,7 +118,11 @@ func MockZtsNewRoleToken(tok zms.Token, domain string, 
opts zts.RoleTokenOptions
        }
 
        mockRoleToken := new(MockRoleToken)
-       mockRoleToken.On("RoleTokenValue").Return("mockRoleToken", nil)
+       if opts.ProxyURL == "" {
+               mockRoleToken.On("RoleTokenValue").Return("mockRoleToken", nil)
+       } else {
+               
mockRoleToken.On("RoleTokenValue").Return("mockRoleToken-"+opts.ProxyURL, nil)
+       }
        mockRoleToken.On("StartPrefetcher").Return(nil)
        mockRoleToken.On("StopPrefetcher").Return(nil)
        return mockRoleToken
@@ -136,7 +140,11 @@ func MockZtsNewRoleTokenFromCert(certFile, keyFile, domain 
string, opts zts.Role
        }
 
        mockRoleToken := new(MockRoleToken)
-       mockRoleToken.On("RoleTokenValue").Return("mockRoleTokenFromCert", nil)
+       if opts.ProxyURL == "" {
+               
mockRoleToken.On("RoleTokenValue").Return("mockRoleTokenFromCert", nil)
+       } else {
+               
mockRoleToken.On("RoleTokenValue").Return("mockRoleTokenFromCert-"+opts.ProxyURL,
 nil)
+       }
        mockRoleToken.On("StartPrefetcher").Return(nil)
        mockRoleToken.On("StopPrefetcher").Return(nil)
        return mockRoleToken
@@ -154,7 +162,8 @@ func TestAthenzAuth(t *testing.T) {
                "",                      // caCert
                "",                      // principalHeader
                "",                      // roleHeader
-               "http://localhost:9999";) // ztsURL
+               "http://localhost:9999";, // ztsURL
+               "")                      // ztsProxyURL
 
        // inject mock function
        athenz := provider.(*athenzAuthProvider)
@@ -174,6 +183,39 @@ func TestAthenzAuth(t *testing.T) {
        assert.False(t, athenz.roleToken.(*MockRoleToken).isPrefetcherStarted)
 }
 
+func TestAthenzAuthWithProxy(t *testing.T) {
+       privateKey := "file://" + clientKeyPath
+       provider := NewAuthenticationAthenz(
+               "pulsar.test.provider",  // providerDomain
+               "pulsar.test.tenant",    // tenantDomain
+               "service",               // tenantService
+               privateKey,              // privateKey
+               "",                      // keyID
+               "",                      // x509CertChain
+               "",                      // caCert
+               "",                      // principalHeader
+               "",                      // roleHeader
+               "http://localhost:9999";, // ztsURL
+               "http://localhost:8080";) // ztsProxyURL
+
+       // inject mock function
+       athenz := provider.(*athenzAuthProvider)
+       athenz.zmsNewTokenBuilder = MockZmsNewTokenBuilder
+       athenz.ztsNewRoleToken = MockZtsNewRoleToken
+
+       err := athenz.Init()
+       assert.NoError(t, err)
+       assert.True(t, athenz.roleToken.(*MockRoleToken).isPrefetcherStarted)
+
+       data, err := athenz.GetData()
+       assert.Equal(t, []byte("mockRoleToken-http://localhost:8080";), data)
+       assert.NoError(t, err)
+
+       err = athenz.Close()
+       assert.NoError(t, err)
+       assert.False(t, athenz.roleToken.(*MockRoleToken).isPrefetcherStarted)
+}
+
 func TestCopperArgos(t *testing.T) {
        privateKey := "file://" + clientKeyPath
        x509CertChain := "file://" + clientCertPath
@@ -189,7 +231,8 @@ func TestCopperArgos(t *testing.T) {
                caCert,                  // caCert
                "",                      // principalHeader
                "",                      // roleHeader
-               "http://localhost:9999";) // ztsURL
+               "http://localhost:9999";, // ztsURL
+               "")                      // ztsProxyURL
 
        // inject mock function
        athenz := provider.(*athenzAuthProvider)
@@ -208,6 +251,41 @@ func TestCopperArgos(t *testing.T) {
        assert.False(t, athenz.roleToken.(*MockRoleToken).isPrefetcherStarted)
 }
 
+func TestCopperArgosWithProxy(t *testing.T) {
+       privateKey := "file://" + clientKeyPath
+       x509CertChain := "file://" + clientCertPath
+       caCert := "file://" + caCertPath
+
+       provider := NewAuthenticationAthenz(
+               "pulsar.test.provider",  // providerDomain
+               "",                      // tenantDomain
+               "",                      // tenantService
+               privateKey,              // privateKey
+               "",                      // keyID
+               x509CertChain,           // x509CertChain
+               caCert,                  // caCert
+               "",                      // principalHeader
+               "",                      // roleHeader
+               "http://localhost:9999";, // ztsURL
+               "http://localhost:8080";) // ztsProxyURL
+
+       // inject mock function
+       athenz := provider.(*athenzAuthProvider)
+       athenz.ztsNewRoleTokenFromCert = MockZtsNewRoleTokenFromCert
+
+       err := athenz.Init()
+       assert.NoError(t, err)
+       assert.True(t, athenz.roleToken.(*MockRoleToken).isPrefetcherStarted)
+
+       data, err := athenz.GetData()
+       assert.Equal(t, []byte("mockRoleTokenFromCert-http://localhost:8080";), 
data)
+       assert.NoError(t, err)
+
+       err = athenz.Close()
+       assert.NoError(t, err)
+       assert.False(t, athenz.roleToken.(*MockRoleToken).isPrefetcherStarted)
+}
+
 func TestIllegalParams(t *testing.T) {
        privateKey := "file://" + clientKeyPath
        x509CertChain := "file://" + clientCertPath
@@ -222,7 +300,8 @@ func TestIllegalParams(t *testing.T) {
                "",                      // caCert
                "",                      // principalHeader
                "",                      // roleHeader
-               "http://localhost:9999";) // ztsURL
+               "http://localhost:9999";, // ztsURL
+               "")                      // ztsProxyURL
        athenz := provider.(*athenzAuthProvider)
 
        err := athenz.Init()
@@ -239,7 +318,8 @@ func TestIllegalParams(t *testing.T) {
                "",                      // caCert
                "",                      // principalHeader
                "",                      // roleHeader
-               "http://localhost:9999";) // ztsURL
+               "http://localhost:9999";, // ztsURL
+               "")                      // ztsProxyURL
        athenz = provider.(*athenzAuthProvider)
 
        err = athenz.Init()
@@ -256,7 +336,8 @@ func TestIllegalParams(t *testing.T) {
                "",                      // caCert
                "",                      // principalHeader
                "",                      // roleHeader
-               "http://localhost:9999";) // ztsURL
+               "http://localhost:9999";, // ztsURL
+               "")                      // ztsProxyURL
        athenz = provider.(*athenzAuthProvider)
 
        err = athenz.Init()
@@ -273,7 +354,8 @@ func TestIllegalParams(t *testing.T) {
                "",                      // caCert
                "",                      // principalHeader
                "",                      // roleHeader
-               "http://localhost:9999";) // ztsURL
+               "http://localhost:9999";, // ztsURL
+               "")                      // ztsProxyURL
        athenz = provider.(*athenzAuthProvider)
 
        err = athenz.Init()

Reply via email to