This is an automated email from the ASF dual-hosted git repository.
nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 05c1e32c Make ZTS proxy configurable in athenz auth plugin (#1360)
05c1e32c is described below
commit 05c1e32c0abe23686c155d06f233272b317bd711
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Fri May 9 12:26:22 2025 +0900
Make ZTS proxy configurable in athenz auth plugin (#1360)
---
pulsar/auth/athenz.go | 7 +++-
pulsar/auth/athenz_test.go | 98 ++++++++++++++++++++++++++++++++++++++++++----
2 files changed, 96 insertions(+), 9 deletions(-)
diff --git a/pulsar/auth/athenz.go b/pulsar/auth/athenz.go
index 0b9be971..e259a655 100644
--- a/pulsar/auth/athenz.go
+++ b/pulsar/auth/athenz.go
@@ -50,6 +50,7 @@ type athenzAuthProvider struct {
principalHeader string
roleHeader string
ztsURL string
+ ztsProxyURL string
tokenBuilder zms.TokenBuilder
roleToken zts.RoleToken
zmsNewTokenBuilder func(domain, name string, privateKeyPEM []byte,
keyVersion string) (zms.TokenBuilder, error)
@@ -78,6 +79,7 @@ func NewAuthenticationAthenzWithParams(params
map[string]string) (Provider, erro
params["principalHeader"],
params["roleHeader"],
params["ztsUrl"],
+ params["ztsProxyUrl"],
), nil
}
@@ -91,7 +93,8 @@ func NewAuthenticationAthenz(
caCert string,
principalHeader string,
roleHeader string,
- ztsURL string) Provider {
+ ztsURL string,
+ ztsProxyURL string) Provider {
fixedKeyID := defaultKeyID
if keyID != "" {
fixedKeyID = keyID
@@ -121,6 +124,7 @@ func NewAuthenticationAthenz(
principalHeader: principalHeader,
roleHeader: fixedRoleHeader,
ztsURL: strings.TrimSuffix(ztsURL, "/"),
+ ztsProxyURL: ztsProxyURL,
zmsNewTokenBuilder: zms.NewTokenBuilder,
ztsNewRoleToken: ztsNewRoleToken,
ztsNewRoleTokenFromCert: ztsNewRoleTokenFromCert,
@@ -135,6 +139,7 @@ func (p *athenzAuthProvider) Init() error {
var roleToken zts.RoleToken
opts := zts.RoleTokenOptions{
BaseZTSURL: p.ztsURL + "/zts/v1",
+ ProxyURL: p.ztsProxyURL,
MinExpire: minExpire,
MaxExpire: maxExpire,
PrefetchInterval: prefetchInterval,
diff --git a/pulsar/auth/athenz_test.go b/pulsar/auth/athenz_test.go
index 4e939151..84d90d26 100644
--- a/pulsar/auth/athenz_test.go
+++ b/pulsar/auth/athenz_test.go
@@ -118,7 +118,11 @@ func MockZtsNewRoleToken(tok zms.Token, domain string,
opts zts.RoleTokenOptions
}
mockRoleToken := new(MockRoleToken)
- mockRoleToken.On("RoleTokenValue").Return("mockRoleToken", nil)
+ if opts.ProxyURL == "" {
+ mockRoleToken.On("RoleTokenValue").Return("mockRoleToken", nil)
+ } else {
+
mockRoleToken.On("RoleTokenValue").Return("mockRoleToken-"+opts.ProxyURL, nil)
+ }
mockRoleToken.On("StartPrefetcher").Return(nil)
mockRoleToken.On("StopPrefetcher").Return(nil)
return mockRoleToken
@@ -136,7 +140,11 @@ func MockZtsNewRoleTokenFromCert(certFile, keyFile, domain
string, opts zts.Role
}
mockRoleToken := new(MockRoleToken)
- mockRoleToken.On("RoleTokenValue").Return("mockRoleTokenFromCert", nil)
+ if opts.ProxyURL == "" {
+
mockRoleToken.On("RoleTokenValue").Return("mockRoleTokenFromCert", nil)
+ } else {
+
mockRoleToken.On("RoleTokenValue").Return("mockRoleTokenFromCert-"+opts.ProxyURL,
nil)
+ }
mockRoleToken.On("StartPrefetcher").Return(nil)
mockRoleToken.On("StopPrefetcher").Return(nil)
return mockRoleToken
@@ -154,7 +162,8 @@ func TestAthenzAuth(t *testing.T) {
"", // caCert
"", // principalHeader
"", // roleHeader
- "http://localhost:9999") // ztsURL
+ "http://localhost:9999", // ztsURL
+ "") // ztsProxyURL
// inject mock function
athenz := provider.(*athenzAuthProvider)
@@ -174,6 +183,39 @@ func TestAthenzAuth(t *testing.T) {
assert.False(t, athenz.roleToken.(*MockRoleToken).isPrefetcherStarted)
}
+func TestAthenzAuthWithProxy(t *testing.T) {
+ privateKey := "file://" + clientKeyPath
+ provider := NewAuthenticationAthenz(
+ "pulsar.test.provider", // providerDomain
+ "pulsar.test.tenant", // tenantDomain
+ "service", // tenantService
+ privateKey, // privateKey
+ "", // keyID
+ "", // x509CertChain
+ "", // caCert
+ "", // principalHeader
+ "", // roleHeader
+ "http://localhost:9999", // ztsURL
+ "http://localhost:8080") // ztsProxyURL
+
+ // inject mock function
+ athenz := provider.(*athenzAuthProvider)
+ athenz.zmsNewTokenBuilder = MockZmsNewTokenBuilder
+ athenz.ztsNewRoleToken = MockZtsNewRoleToken
+
+ err := athenz.Init()
+ assert.NoError(t, err)
+ assert.True(t, athenz.roleToken.(*MockRoleToken).isPrefetcherStarted)
+
+ data, err := athenz.GetData()
+ assert.Equal(t, []byte("mockRoleToken-http://localhost:8080"), data)
+ assert.NoError(t, err)
+
+ err = athenz.Close()
+ assert.NoError(t, err)
+ assert.False(t, athenz.roleToken.(*MockRoleToken).isPrefetcherStarted)
+}
+
func TestCopperArgos(t *testing.T) {
privateKey := "file://" + clientKeyPath
x509CertChain := "file://" + clientCertPath
@@ -189,7 +231,8 @@ func TestCopperArgos(t *testing.T) {
caCert, // caCert
"", // principalHeader
"", // roleHeader
- "http://localhost:9999") // ztsURL
+ "http://localhost:9999", // ztsURL
+ "") // ztsProxyURL
// inject mock function
athenz := provider.(*athenzAuthProvider)
@@ -208,6 +251,41 @@ func TestCopperArgos(t *testing.T) {
assert.False(t, athenz.roleToken.(*MockRoleToken).isPrefetcherStarted)
}
+func TestCopperArgosWithProxy(t *testing.T) {
+ privateKey := "file://" + clientKeyPath
+ x509CertChain := "file://" + clientCertPath
+ caCert := "file://" + caCertPath
+
+ provider := NewAuthenticationAthenz(
+ "pulsar.test.provider", // providerDomain
+ "", // tenantDomain
+ "", // tenantService
+ privateKey, // privateKey
+ "", // keyID
+ x509CertChain, // x509CertChain
+ caCert, // caCert
+ "", // principalHeader
+ "", // roleHeader
+ "http://localhost:9999", // ztsURL
+ "http://localhost:8080") // ztsProxyURL
+
+ // inject mock function
+ athenz := provider.(*athenzAuthProvider)
+ athenz.ztsNewRoleTokenFromCert = MockZtsNewRoleTokenFromCert
+
+ err := athenz.Init()
+ assert.NoError(t, err)
+ assert.True(t, athenz.roleToken.(*MockRoleToken).isPrefetcherStarted)
+
+ data, err := athenz.GetData()
+ assert.Equal(t, []byte("mockRoleTokenFromCert-http://localhost:8080"),
data)
+ assert.NoError(t, err)
+
+ err = athenz.Close()
+ assert.NoError(t, err)
+ assert.False(t, athenz.roleToken.(*MockRoleToken).isPrefetcherStarted)
+}
+
func TestIllegalParams(t *testing.T) {
privateKey := "file://" + clientKeyPath
x509CertChain := "file://" + clientCertPath
@@ -222,7 +300,8 @@ func TestIllegalParams(t *testing.T) {
"", // caCert
"", // principalHeader
"", // roleHeader
- "http://localhost:9999") // ztsURL
+ "http://localhost:9999", // ztsURL
+ "") // ztsProxyURL
athenz := provider.(*athenzAuthProvider)
err := athenz.Init()
@@ -239,7 +318,8 @@ func TestIllegalParams(t *testing.T) {
"", // caCert
"", // principalHeader
"", // roleHeader
- "http://localhost:9999") // ztsURL
+ "http://localhost:9999", // ztsURL
+ "") // ztsProxyURL
athenz = provider.(*athenzAuthProvider)
err = athenz.Init()
@@ -256,7 +336,8 @@ func TestIllegalParams(t *testing.T) {
"", // caCert
"", // principalHeader
"", // roleHeader
- "http://localhost:9999") // ztsURL
+ "http://localhost:9999", // ztsURL
+ "") // ztsProxyURL
athenz = provider.(*athenzAuthProvider)
err = athenz.Init()
@@ -273,7 +354,8 @@ func TestIllegalParams(t *testing.T) {
"", // caCert
"", // principalHeader
"", // roleHeader
- "http://localhost:9999") // ztsURL
+ "http://localhost:9999", // ztsURL
+ "") // ztsProxyURL
athenz = provider.(*athenzAuthProvider)
err = athenz.Init()