This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f822ef  [auth] Add Athenz authentication provider (#227)
0f822ef is described below

commit 0f822efff1c1a78a517982aed43d56e48eed3778
Author: Yuri Mizushima <[email protected]>
AuthorDate: Sat Apr 25 22:38:30 2020 +0900

    [auth] Add Athenz authentication provider (#227)
    
    * feat: add athenz authentication provider
    
    * style: fix style errors
---
 go.mod                              |   3 +-
 go.sum                              |  39 ++++++++
 pulsar/client.go                    |   5 +
 pulsar/client_impl.go               |   4 +
 pulsar/internal/auth/athenz.go      | 179 ++++++++++++++++++++++++++++++++++++
 pulsar/internal/auth/athenz_test.go | 130 ++++++++++++++++++++++++++
 pulsar/internal/auth/provider.go    |   8 +-
 7 files changed, 366 insertions(+), 2 deletions(-)

diff --git a/go.mod b/go.mod
index bdbadf1..34bae4e 100644
--- a/go.mod
+++ b/go.mod
@@ -15,5 +15,6 @@ require (
        github.com/spaolacci/murmur3 v1.1.0
        github.com/spf13/cobra v0.0.3
        github.com/spf13/pflag v1.0.3 // indirect
-       github.com/stretchr/testify v1.3.0
+       github.com/stretchr/testify v1.4.0
+       github.com/yahoo/athenz v1.8.55
 )
diff --git a/go.sum b/go.sum
index 3d8be2a..f66879a 100644
--- a/go.sum
+++ b/go.sum
@@ -1,16 +1,27 @@
+github.com/BurntSushi/toml v0.3.1/go.mod 
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/ardielle/ardielle-go v1.5.2 
h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
+github.com/ardielle/ardielle-go v1.5.2/go.mod 
h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
+github.com/ardielle/ardielle-tools v1.5.4/go.mod 
h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk=
 github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 
h1:KXlsf+qt/X5ttPGEjR0tPH1xaWWoKBEg9Q1THAj2h3I=
 github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod 
h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b 
h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod 
h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
+github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod 
h1:Crc/GCZ3NXDVCio7Yr0o+SSrytpcFhLmVCIzi0s49t4=
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dimfeld/httptreemux v5.0.1+incompatible 
h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
+github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod 
h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
 github.com/golang/protobuf v1.3.1 
h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
 github.com/golang/protobuf v1.3.1/go.mod 
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
 github.com/google/go-cmp v0.3.0/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/gorilla/context v1.1.1/go.mod 
h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
+github.com/gorilla/mux v1.7.3/go.mod 
h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
 github.com/inconshreveable/mousetrap v1.0.0 
h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod 
h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
+github.com/jawher/mow.cli v1.0.4/go.mod 
h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk=
+github.com/jawher/mow.cli v1.1.0/go.mod 
h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg=
 github.com/klauspost/compress v1.9.2 
h1:LfVyl+ZlLlLDeQ/d2AqfGIIH4qEDu0Ed2S5GyhCWIWY=
 github.com/klauspost/compress v1.9.2/go.mod 
h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
@@ -31,8 +42,36 @@ github.com/spf13/pflag v1.0.3 
h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
 github.com/spf13/pflag v1.0.3/go.mod 
h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
 github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.1.1/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
+github.com/stretchr/objx v0.2.0/go.mod 
h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
 github.com/stretchr/testify v1.2.2/go.mod 
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
 github.com/stretchr/testify v1.3.0 
h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
 github.com/stretchr/testify v1.3.0/go.mod 
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.4.0 
h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
+github.com/stretchr/testify v1.4.0/go.mod 
h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0=
+github.com/yahoo/athenz v1.8.55/go.mod 
h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod 
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod 
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 
h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa 
h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M=
+golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190808195139-e713427fea3f/go.mod 
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod 
h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
+gopkg.in/square/go-jose.v2 v2.4.1/go.mod 
h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
+gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/pulsar/client.go b/pulsar/client.go
index 5a98496..53de468 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -57,6 +57,11 @@ func NewAuthenticationTLS(certificatePath string, 
privateKeyPath string) Authent
        return auth.NewAuthenticationTLS(certificatePath, privateKeyPath)
 }
 
+func NewAuthenticationAthenz(authParams map[string]string) Authentication {
+       athenz, _ := auth.NewAuthenticationAthenzWithParams(authParams)
+       return athenz
+}
+
 // Builder interface that is used to construct a Pulsar Client instance.
 type ClientOptions struct {
        // Configure the service URL for the Pulsar service.
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index f768389..02d9883 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -80,6 +80,10 @@ func newClient(options ClientOptions) (Client, error) {
                        return nil, errors.New("invalid auth provider 
interface")
                }
        }
+       err = authProvider.Init()
+       if err != nil {
+               return nil, err
+       }
 
        connectionTimeout := options.ConnectionTimeout
        if connectionTimeout.Nanoseconds() == 0 {
diff --git a/pulsar/internal/auth/athenz.go b/pulsar/internal/auth/athenz.go
new file mode 100644
index 0000000..3d650f7
--- /dev/null
+++ b/pulsar/internal/auth/athenz.go
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package auth
+
+import (
+       "crypto/tls"
+       "encoding/base64"
+       "errors"
+       "io/ioutil"
+       "regexp"
+       "strings"
+       "time"
+
+       zms "github.com/yahoo/athenz/libs/go/zmssvctoken"
+       zts "github.com/yahoo/athenz/libs/go/ztsroletoken"
+)
+
+const (
+       minExpire = 2 * time.Hour
+       maxExpire = 24 * time.Hour
+)
+
+type athenzAuthProvider struct {
+       providerDomain     string
+       tenantDomain       string
+       tenantService      string
+       privateKey         string
+       keyID              string
+       principalHeader    string
+       ztsURL             string
+       tokenBuilder       zms.TokenBuilder
+       roleToken          zts.RoleToken
+       zmsNewTokenBuilder func(domain, name string, privateKeyPEM []byte, 
keyVersion string) (zms.TokenBuilder, error)
+       ztsNewRoleToken    func(tok zms.Token, domain string, opts 
zts.RoleTokenOptions) zts.RoleToken
+}
+
+type privateKeyURI struct {
+       Scheme                   string
+       MediaTypeAndEncodingType string
+       Data                     string
+       Path                     string
+}
+
+func NewAuthenticationAthenzWithParams(params map[string]string) (Provider, 
error) {
+       return NewAuthenticationAthenz(
+               params["providerDomain"],
+               params["tenantDomain"],
+               params["tenantService"],
+               params["privateKey"],
+               params["keyId"],
+               params["principalHeader"],
+               params["ztsUrl"],
+       ), nil
+}
+
+func NewAuthenticationAthenz(
+       providerDomain string,
+       tenantDomain string,
+       tenantService string,
+       privateKey string,
+       keyID string,
+       principalHeader string,
+       ztsURL string) Provider {
+       var fixedKeyID string
+       if keyID == "" {
+               fixedKeyID = "0"
+       } else {
+               fixedKeyID = keyID
+       }
+       ztsNewRoleToken := func(tok zms.Token, domain string, opts 
zts.RoleTokenOptions) zts.RoleToken {
+               return zts.RoleToken(zts.NewRoleToken(tok, domain, opts))
+       }
+
+       return &athenzAuthProvider{
+               providerDomain:     providerDomain,
+               tenantDomain:       tenantDomain,
+               tenantService:      tenantService,
+               privateKey:         privateKey,
+               keyID:              fixedKeyID,
+               principalHeader:    principalHeader,
+               ztsURL:             strings.TrimSuffix(ztsURL, "/"),
+               zmsNewTokenBuilder: zms.NewTokenBuilder,
+               ztsNewRoleToken:    ztsNewRoleToken,
+       }
+}
+
+func (p *athenzAuthProvider) Init() error {
+       uriSt := parseURI(p.privateKey)
+       var keyData []byte
+
+       if uriSt.Scheme == "data" {
+               if uriSt.MediaTypeAndEncodingType != 
"application/x-pem-file;base64" {
+                       return errors.New("Unsupported mediaType or 
encodingType: " + uriSt.MediaTypeAndEncodingType)
+               }
+               key, err := base64.StdEncoding.DecodeString(uriSt.Data)
+               if err != nil {
+                       return err
+               }
+               keyData = key
+       } else if uriSt.Scheme == "file" {
+               key, err := ioutil.ReadFile(uriSt.Path)
+               if err != nil {
+                       return err
+               }
+               keyData = key
+       } else {
+               return errors.New("Unsupported URI Scheme: " + uriSt.Scheme)
+       }
+
+       tb, err := p.zmsNewTokenBuilder(p.tenantDomain, p.tenantService, 
keyData, p.keyID)
+       if err != nil {
+               return err
+       }
+       p.tokenBuilder = tb
+
+       roleToken := p.ztsNewRoleToken(p.tokenBuilder.Token(), 
p.providerDomain, zts.RoleTokenOptions{
+               BaseZTSURL: p.ztsURL + "/zts/v1",
+               MinExpire:  minExpire,
+               MaxExpire:  maxExpire,
+               AuthHeader: p.principalHeader,
+       })
+       p.roleToken = roleToken
+
+       return nil
+}
+
+func (p *athenzAuthProvider) Name() string {
+       return "athenz"
+}
+
+func (p *athenzAuthProvider) GetTLSCertificate() (*tls.Certificate, error) {
+       return nil, nil
+}
+
+func (p *athenzAuthProvider) GetData() ([]byte, error) {
+       tok, err := p.roleToken.RoleTokenValue()
+       if err != nil {
+               return nil, err
+       }
+
+       return []byte(tok), nil
+}
+
+func (p *athenzAuthProvider) Close() error {
+       return nil
+}
+
+func parseURI(uri string) privateKeyURI {
+       var uriSt privateKeyURI
+       // scheme mediatype[;base64] path file
+       const expression = 
`^(?:([^:/?#]+):)(?:([;/\\\-\w]*),)?(?:/{0,2}((?:[^?#/]*/)*))?([^?#]*)`
+
+       // when expression cannot be parsed, then panics
+       re := regexp.MustCompile(expression)
+       if re.MatchString(uri) {
+               groups := re.FindStringSubmatch(uri)
+               uriSt.Scheme = groups[1]
+               uriSt.MediaTypeAndEncodingType = groups[2]
+               uriSt.Data = groups[4]
+               uriSt.Path = groups[3] + groups[4]
+       }
+
+       return uriSt
+}
diff --git a/pulsar/internal/auth/athenz_test.go 
b/pulsar/internal/auth/athenz_test.go
new file mode 100644
index 0000000..9fbb670
--- /dev/null
+++ b/pulsar/internal/auth/athenz_test.go
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package auth
+
+import (
+       "bytes"
+       "errors"
+       "io/ioutil"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       mock "github.com/stretchr/testify/mock"
+       zms "github.com/yahoo/athenz/libs/go/zmssvctoken"
+       zts "github.com/yahoo/athenz/libs/go/ztsroletoken"
+)
+
+const tlsClientKeyPath = "../../../integration-tests/certs/client-key.pem"
+
+type MockTokenBuilder struct {
+       mock.Mock
+}
+
+type MockToken struct {
+       mock.Mock
+}
+
+type MockRoleToken struct {
+       mock.Mock
+}
+
+func (m *MockTokenBuilder) SetExpiration(t time.Duration) {
+}
+func (m *MockTokenBuilder) SetHostname(h string) {
+}
+func (m *MockTokenBuilder) SetIPAddress(ip string) {
+}
+func (m *MockTokenBuilder) SetKeyService(keyService string) {
+}
+func (m *MockTokenBuilder) Token() zms.Token {
+       result := m.Called()
+       return result.Get(0).(zms.Token)
+}
+
+func (m *MockToken) Value() (string, error) {
+       result := m.Called()
+       return result.Get(0).(string), result.Error(1)
+}
+
+func (m *MockRoleToken) RoleTokenValue() (string, error) {
+       result := m.Called()
+       return result.Get(0).(string), result.Error(1)
+}
+
+func MockZmsNewTokenBuilder(domain, name string, privateKeyPEM []byte, 
keyVersion string) (zms.TokenBuilder, error) {
+       // assertion
+       key, err := ioutil.ReadFile(tlsClientKeyPath)
+       if err != nil {
+               return nil, err
+       }
+       if domain != "pulsar.test.tenant" ||
+               name != "service" ||
+               !bytes.Equal(key, privateKeyPEM) ||
+               keyVersion != "0" {
+               return nil, errors.New("Assertion error")
+       }
+
+       mockToken := new(MockToken)
+       mockToken.On("Value").Return("mockPrincipalToken", nil)
+       mockTokenBuilder := new(MockTokenBuilder)
+       mockTokenBuilder.On("Token").Return(mockToken)
+       return mockTokenBuilder, nil
+}
+
+func MockZtsNewRoleToken(tok zms.Token, domain string, opts 
zts.RoleTokenOptions) zts.RoleToken {
+       // assertion
+       token, err := tok.Value()
+       if err != nil {
+               return nil
+       }
+       if token != "mockPrincipalToken" ||
+               domain != "pulsar.test.provider" ||
+               opts.BaseZTSURL != "http://localhost:9999/zts/v1"; ||
+               opts.AuthHeader != "" {
+               return nil
+       }
+
+       mockRoleToken := new(MockRoleToken)
+       mockRoleToken.On("RoleTokenValue").Return("mockRoleToken", nil)
+       return mockRoleToken
+}
+
+func TestAthenzAuth(t *testing.T) {
+       privateKey := "file://" + tlsClientKeyPath
+       provider := NewAuthenticationAthenz(
+               "pulsar.test.provider",
+               "pulsar.test.tenant",
+               "service",
+               privateKey,
+               "",
+               "",
+               "http://localhost:9999";)
+
+       // inject mock function
+       athenz := provider.(*athenzAuthProvider)
+       athenz.zmsNewTokenBuilder = MockZmsNewTokenBuilder
+       athenz.ztsNewRoleToken = MockZtsNewRoleToken
+
+       err := athenz.Init()
+       assert.NoError(t, err)
+
+       data, err := athenz.GetData()
+       assert.Equal(t, []byte("mockRoleToken"), data)
+       assert.NoError(t, err)
+}
diff --git a/pulsar/internal/auth/provider.go b/pulsar/internal/auth/provider.go
index 8868df5..220ff14 100644
--- a/pulsar/internal/auth/provider.go
+++ b/pulsar/internal/auth/provider.go
@@ -19,6 +19,7 @@ package auth
 
 import (
        "crypto/tls"
+       "encoding/json"
        "fmt"
        "io"
 
@@ -59,11 +60,16 @@ func NewProvider(name string, params string) (Provider, 
error) {
        case "token", "org.apache.pulsar.client.impl.auth.AuthenticationToken":
                return NewAuthenticationTokenWithParams(m)
 
+       case "athenz", 
"org.apache.pulsar.client.impl.auth.AuthenticationAthenz":
+               return NewAuthenticationAthenzWithParams(m)
+
        default:
                return nil, errors.New(fmt.Sprintf("invalid auth provider 
'%s'", name))
        }
 }
 
 func parseParams(params string) map[string]string {
-       return nil
+       var mapString map[string]string
+       json.Unmarshal([]byte(params), &mapString)
+       return mapString
 }

Reply via email to