This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b9f8c5c  Support oauth2 authentication for pulsar-client-go (#313)
b9f8c5c is described below

commit b9f8c5cedefb1bfb82faee09ce06f48c8f7300f5
Author: Yong Zhang <[email protected]>
AuthorDate: Wed Jul 15 16:36:26 2020 +0800

    Support oauth2 authentication for pulsar-client-go (#313)
    
    * Authentication provider for OAuth 2.0
    - based on cloud-cli @ bc645b16ca7b7474b132ee1da8b56da35025a616
    
    * Add tests and Update license
    
    * Revert zstd version
    
    * Refactor to support multiple issuers.
    - decouple the issuer parameter from the audience
    - use issuer information that is in the keyfile
    
    * Address comments
    
    * Add tests
    
    * Change clock package
    
    Co-authored-by: Eron Wright <[email protected]>
---
 go.mod                                      |   7 +-
 go.sum                                      |  64 +++++-
 oauth2/auth.go                              | 120 ++++++++++
 oauth2/auth_suite_test.go                   |  65 ++++++
 oauth2/authorization_tokenretriever.go      | 338 ++++++++++++++++++++++++++++
 oauth2/authorization_tokenretriever_test.go | 338 ++++++++++++++++++++++++++++
 oauth2/cache/cache.go                       | 142 ++++++++++++
 oauth2/client_credentials_flow.go           | 158 +++++++++++++
 oauth2/client_credentials_flow_test.go      | 183 +++++++++++++++
 oauth2/client_credentials_provider.go       |  66 ++++++
 oauth2/clock/clock.go                       |  98 ++++++++
 oauth2/clock/testing/fake_clock.go          | 275 ++++++++++++++++++++++
 oauth2/config_tokenprovider.go              |  57 +++++
 oauth2/config_tokenprovider_test.go         |  91 ++++++++
 oauth2/device_code_flow.go                  | 203 +++++++++++++++++
 oauth2/device_code_flow_test.go             | 230 +++++++++++++++++++
 oauth2/device_code_provider.go              | 133 +++++++++++
 oauth2/go.mod                               |  12 +
 oauth2/go.sum                               | 113 ++++++++++
 oauth2/oidc_endpoint_provider.go            |  58 +++++
 oauth2/oidc_endpoint_provider_test.go       |  92 ++++++++
 oauth2/store/keyring.go                     | 194 ++++++++++++++++
 oauth2/store/memory.go                      |  87 +++++++
 oauth2/store/store.go                       |  45 ++++
 pulsar/client.go                            |   5 +
 pulsar/client_impl_test.go                  |  94 ++++++++
 pulsar/internal/auth/oauth2.go              | 145 ++++++++++++
 pulsar/internal/auth/oauth2_test.go         | 117 ++++++++++
 pulsar/internal/auth/token.go               |   2 +-
 29 files changed, 3524 insertions(+), 8 deletions(-)

diff --git a/go.mod b/go.mod
index afae8cc..969b35a 100644
--- a/go.mod
+++ b/go.mod
@@ -1,9 +1,10 @@
 module github.com/apache/pulsar-client-go
 
-go 1.12
+go 1.13
 
 require (
        github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32
+       github.com/apache/pulsar-client-go/oauth2 
v0.0.0-00010101000000-000000000000
        github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6
        github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
        github.com/gogo/protobuf v1.3.1
@@ -11,7 +12,7 @@ require (
        github.com/klauspost/compress v1.10.8
        github.com/kr/pretty v0.2.0 // indirect
        github.com/pierrec/lz4 v2.0.5+incompatible
-       github.com/pkg/errors v0.8.1
+       github.com/pkg/errors v0.9.1
        github.com/prometheus/client_golang v1.7.1
        github.com/sirupsen/logrus v1.4.2
        github.com/spaolacci/murmur3 v1.1.0
@@ -20,3 +21,5 @@ require (
        github.com/stretchr/testify v1.4.0
        github.com/yahoo/athenz v1.8.55
 )
+
+replace github.com/apache/pulsar-client-go/oauth2 => ./oauth2
diff --git a/go.sum b/go.sum
index 9f51dbb..e766ca5 100644
--- a/go.sum
+++ b/go.sum
@@ -1,3 +1,6 @@
+cloud.google.com/go v0.34.0/go.mod 
h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/99designs/keyring v1.1.5 
h1:wLv7QyzYpFIyMSwOADq1CLTF9KbjbBfcnfmOGJ64aO4=
+github.com/99designs/keyring v1.1.5/go.mod 
h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpHqTozIPqf0=
 github.com/BurntSushi/toml v0.3.1/go.mod 
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32 
h1:/gZKpgSMydtrih81nvUhlkXpZIUfthKShSCVbRzBt9Y=
 github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32/go.mod 
h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
@@ -19,16 +22,27 @@ github.com/bmizerany/perks 
v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D
 github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod 
h1:Crc/GCZ3NXDVCio7Yr0o+SSrytpcFhLmVCIzi0s49t4=
 github.com/cespare/xxhash/v2 v2.1.1 
h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
 github.com/cespare/xxhash/v2 v2.1.1/go.mod 
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/danieljoos/wincred v1.0.2 
h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU=
+github.com/danieljoos/wincred v1.0.2/go.mod 
h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U=
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible 
h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod 
h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/dimfeld/httptreemux v5.0.1+incompatible 
h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
 github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod 
h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
+github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a 
h1:mq+R6XEM6lJX5VlLyZIrUSP8tSuJp82xTK89hvBwJbU=
+github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a/go.mod 
h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
+github.com/fsnotify/fsnotify v1.4.7/go.mod 
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.9 
h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
+github.com/fsnotify/fsnotify v1.4.9/go.mod 
h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
 github.com/go-kit/kit v0.8.0/go.mod 
h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
 github.com/go-kit/kit v0.9.0/go.mod 
h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
 github.com/go-logfmt/logfmt v0.3.0/go.mod 
h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
 github.com/go-logfmt/logfmt v0.4.0/go.mod 
h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
 github.com/go-stack/stack v1.8.0/go.mod 
h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 
h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
+github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod 
h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
 github.com/gogo/protobuf v1.1.1/go.mod 
h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
 github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
 github.com/gogo/protobuf v1.3.1/go.mod 
h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
@@ -49,6 +63,9 @@ github.com/google/go-cmp v0.4.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
 github.com/google/gofuzz v1.0.0/go.mod 
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/gorilla/context v1.1.1/go.mod 
h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
 github.com/gorilla/mux v1.7.3/go.mod 
h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
+github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c 
h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
+github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod 
h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
+github.com/hpcloud/tail v1.0.0/go.mod 
h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/inconshreveable/mousetrap v1.0.0 
h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod 
h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
 github.com/jawher/mow.cli v1.0.4/go.mod 
h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk=
@@ -56,6 +73,8 @@ github.com/jawher/mow.cli v1.1.0/go.mod 
h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6Pyu
 github.com/json-iterator/go v1.1.6/go.mod 
h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
 github.com/json-iterator/go v1.1.10/go.mod 
h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
 github.com/julienschmidt/httprouter v1.2.0/go.mod 
h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
+github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d 
h1:Z+RDyXzjKE0i2sTjZ/b1uxiGtPhFy34Ou/Tk0qwN0kM=
+github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod 
h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc=
 github.com/kisielk/errcheck v1.2.0/go.mod 
h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
 github.com/kisielk/gotool v1.0.0/go.mod 
h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
 github.com/klauspost/compress v1.10.8 
h1:eLeJ3dr/Y9+XRfJT4l+8ZjmtB5RPJhucH2HeCV5+IZY=
@@ -71,16 +90,30 @@ github.com/kr/text v0.1.0 
h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod 
h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/matttproud/golang_protobuf_extensions v1.0.1 
h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
 github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod 
h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/mitchellh/go-homedir v1.1.0 
h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
+github.com/mitchellh/go-homedir v1.1.0/go.mod 
h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
 github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod 
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod 
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod 
h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
 github.com/modern-go/reflect2 v1.0.1/go.mod 
h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/mtibben/percent v0.2.1 
h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs=
+github.com/mtibben/percent v0.2.1/go.mod 
h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod 
h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
+github.com/nxadm/tail v1.4.4/go.mod 
h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/onsi/ginkgo v1.6.0/go.mod 
h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod 
h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA=
+github.com/onsi/ginkgo v1.14.0/go.mod 
h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
+github.com/onsi/gomega v1.7.1/go.mod 
h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
+github.com/onsi/gomega v1.10.1/go.mod 
h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
 github.com/pierrec/lz4 v2.0.5+incompatible 
h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod 
h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
 github.com/pkg/errors v0.8.0/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
-github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
 github.com/pkg/errors v0.8.1/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/prometheus/client_golang v0.9.1/go.mod 
h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
@@ -112,7 +145,6 @@ github.com/stretchr/objx v0.1.1/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
 github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
 github.com/stretchr/objx v0.2.0/go.mod 
h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
 github.com/stretchr/testify v1.2.2/go.mod 
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
-github.com/stretchr/testify v1.3.0 
h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
 github.com/stretchr/testify v1.3.0/go.mod 
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.4.0 
h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
 github.com/stretchr/testify v1.4.0/go.mod 
h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
@@ -120,28 +152,43 @@ github.com/yahoo/athenz v1.8.55 
h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0=
 github.com/yahoo/athenz v1.8.55/go.mod 
h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod 
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 
h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
 golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod 
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod 
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 
h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod 
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d 
h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 
h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa 
h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M=
+golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 
h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
 golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -149,6 +196,8 @@ golang.org/x/tools 
v0.0.0-20190808195139-e713427fea3f/go.mod h1:b+2E5dAYhXwXZwtn
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 
h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/appengine v1.4.0 
h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
+google.golang.org/appengine v1.4.0/go.mod 
h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod 
h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
 google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod 
h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
 google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod 
h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -158,13 +207,18 @@ google.golang.org/protobuf v1.23.0 
h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyz
 google.golang.org/protobuf v1.23.0/go.mod 
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod 
h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 
h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod 
h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod 
h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
 gopkg.in/square/go-jose.v2 v2.4.1/go.mod 
h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 
h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod 
h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
 gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
 gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
+gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/oauth2/auth.go b/oauth2/auth.go
new file mode 100644
index 0000000..867afae
--- /dev/null
+++ b/oauth2/auth.go
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+       "fmt"
+       "time"
+
+       "github.com/apache/pulsar-client-go/oauth2/clock"
+       "github.com/dgrijalva/jwt-go"
+       "golang.org/x/oauth2"
+)
+
+const (
+       ClaimNameUserName = "https://pulsar.apache.org/username";
+)
+
+// Flow abstracts an OAuth 2.0 authentication and authorization flow
+type Flow interface {
+       // Authorize obtains an authorization grant based on an OAuth 2.0 
authorization flow.
+       // The method returns a grant which may contain an initial access token.
+       Authorize(audience string) (*AuthorizationGrant, error)
+}
+
+// AuthorizationGrantRefresher refreshes OAuth 2.0 authorization grant
+type AuthorizationGrantRefresher interface {
+       // Refresh refreshes an authorization grant to contain a fresh access 
token
+       Refresh(grant *AuthorizationGrant) (*AuthorizationGrant, error)
+}
+
+type AuthorizationGrantType string
+
+const (
+       // GrantTypeClientCredentials represents a client credentials grant
+       GrantTypeClientCredentials AuthorizationGrantType = "client_credentials"
+
+       // GrantTypeDeviceCode represents a device code grant
+       GrantTypeDeviceCode AuthorizationGrantType = "device_code"
+)
+
+// AuthorizationGrant is a credential representing the resource owner's 
authorization
+// to access its protected resources, and is used by the client to obtain an 
access token
+type AuthorizationGrant struct {
+       // Type describes the type of authorization grant represented by this 
structure
+       Type AuthorizationGrantType `json:"type"`
+
+       // Audience is the intended audience of the access tokens
+       Audience string `json:"audience,omitempty"`
+
+       // ClientID is an OAuth2 client identifier used by some flows
+       ClientID string `json:"client_id,omitempty"`
+
+       // ClientCredentials is credentials data for the client credentials 
grant type
+       ClientCredentials *KeyFile `json:"client_credentials,omitempty"`
+
+       // the token endpoint
+       TokenEndpoint string `json:"token_endpoint"`
+
+       // Token contains an access token in the client credentials grant type,
+       // and a refresh token in the device authorization grant type
+       Token *oauth2.Token `json:"token,omitempty"`
+}
+
+// TokenResult holds token information
+type TokenResult struct {
+       AccessToken  string `json:"access_token"`
+       IDToken      string `json:"id_token"`
+       RefreshToken string `json:"refresh_token"`
+       ExpiresIn    int    `json:"expires_in"`
+}
+
+// Issuer holds information about the issuer of tokens
+type Issuer struct {
+       IssuerEndpoint string
+       ClientID       string
+       Audience       string
+}
+
+func convertToOAuth2Token(token *TokenResult, clock clock.Clock) oauth2.Token {
+       return oauth2.Token{
+               AccessToken:  token.AccessToken,
+               TokenType:    "bearer",
+               RefreshToken: token.RefreshToken,
+               Expiry:       clock.Now().Add(time.Duration(token.ExpiresIn) * 
time.Second),
+       }
+}
+
+// ExtractUserName extracts the username claim from an authorization grant
+func ExtractUserName(token oauth2.Token) (string, error) {
+       p := jwt.Parser{}
+       claims := jwt.MapClaims{}
+       if _, _, err := p.ParseUnverified(token.AccessToken, claims); err != 
nil {
+               return "", fmt.Errorf("unable to decode the access token: %v", 
err)
+       }
+       username, ok := claims[ClaimNameUserName]
+       if !ok {
+               return "", fmt.Errorf("access token doesn't contain a username 
claim")
+       }
+       switch v := username.(type) {
+       case string:
+               return v, nil
+       default:
+               return "", fmt.Errorf("access token contains an unsupported 
username claim")
+       }
+}
diff --git a/oauth2/auth_suite_test.go b/oauth2/auth_suite_test.go
new file mode 100644
index 0000000..95accff
--- /dev/null
+++ b/oauth2/auth_suite_test.go
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+       "context"
+       "testing"
+
+       . "github.com/onsi/ginkgo"
+       . "github.com/onsi/gomega"
+)
+
+func TestAuth(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "cloud-cli Auth Suite")
+}
+
+type MockTokenExchanger struct {
+       CalledWithRequest        interface{}
+       ReturnsTokens            *TokenResult
+       ReturnsError             error
+       RefreshCalledWithRequest *RefreshTokenExchangeRequest
+}
+
+func (te *MockTokenExchanger) ExchangeCode(req 
AuthorizationCodeExchangeRequest) (*TokenResult, error) {
+       te.CalledWithRequest = &req
+       return te.ReturnsTokens, te.ReturnsError
+}
+
+func (te *MockTokenExchanger) ExchangeRefreshToken(req 
RefreshTokenExchangeRequest) (*TokenResult, error) {
+       te.RefreshCalledWithRequest = &req
+       return te.ReturnsTokens, te.ReturnsError
+}
+
+func (te *MockTokenExchanger) ExchangeClientCredentials(req 
ClientCredentialsExchangeRequest) (*TokenResult, error) {
+       te.CalledWithRequest = &req
+       return te.ReturnsTokens, te.ReturnsError
+}
+
+func (te *MockTokenExchanger) ExchangeDeviceCode(ctx context.Context,
+       req DeviceCodeExchangeRequest) (*TokenResult, error) {
+       te.CalledWithRequest = &req
+       return te.ReturnsTokens, te.ReturnsError
+}
+
+var oidcEndpoints = OIDCWellKnownEndpoints{
+       AuthorizationEndpoint:       "http://issuer/auth/authorize";,
+       TokenEndpoint:               "http://issuer/auth/token";,
+       DeviceAuthorizationEndpoint: "http://issuer/auth/authorize/device";,
+}
diff --git a/oauth2/authorization_tokenretriever.go 
b/oauth2/authorization_tokenretriever.go
new file mode 100644
index 0000000..93c1bfe
--- /dev/null
+++ b/oauth2/authorization_tokenretriever.go
@@ -0,0 +1,338 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+       "context"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "net/http"
+       "net/url"
+       "strconv"
+       "strings"
+       "time"
+)
+
+// TokenRetriever implements AuthTokenExchanger in order to facilitate getting
+// Tokens
+type TokenRetriever struct {
+       transport HTTPAuthTransport
+}
+
+// AuthorizationTokenResponse is the HTTP response when asking for a new token.
+// Note that not all fields will contain data based on what kind of request was
+// sent
+type AuthorizationTokenResponse struct {
+       AccessToken  string `json:"access_token"`
+       ExpiresIn    int    `json:"expires_in"`
+       IDToken      string `json:"id_token"`
+       RefreshToken string `json:"refresh_token"`
+       TokenType    string `json:"token_type"`
+}
+
+// AuthorizationCodeExchangeRequest is used to request the exchange of an
+// authorization code for a token
+type AuthorizationCodeExchangeRequest struct {
+       TokenEndpoint string
+       ClientID      string
+       CodeVerifier  string
+       Code          string
+       RedirectURI   string
+}
+
+// RefreshTokenExchangeRequest is used to request the exchange of a refresh
+// token for a refreshed token
+type RefreshTokenExchangeRequest struct {
+       TokenEndpoint string
+       ClientID      string
+       RefreshToken  string
+}
+
+// ClientCredentialsExchangeRequest is used to request the exchange of
+// client credentials for a token
+type ClientCredentialsExchangeRequest struct {
+       TokenEndpoint string
+       ClientID      string
+       ClientSecret  string
+       Audience      string
+}
+
+// DeviceCodeExchangeRequest is used to request the exchange of
+// a device code for a token
+type DeviceCodeExchangeRequest struct {
+       TokenEndpoint string
+       ClientID      string
+       DeviceCode    string
+       PollInterval  time.Duration
+}
+
+// TokenErrorResponse is used to parse error responses from the token endpoint
+type TokenErrorResponse struct {
+       Error            string `json:"error"`
+       ErrorDescription string `json:"error_description"`
+}
+
+type TokenError struct {
+       ErrorCode        string
+       ErrorDescription string
+}
+
+func (e *TokenError) Error() string {
+       if e.ErrorDescription != "" {
+               return fmt.Sprintf("%s (%s)", e.ErrorDescription, e.ErrorCode)
+       }
+       return e.ErrorCode
+}
+
+// HTTPAuthTransport abstracts how an HTTP exchange request is sent and 
received
+type HTTPAuthTransport interface {
+       Do(request *http.Request) (*http.Response, error)
+}
+
+// NewTokenRetriever allows a TokenRetriever the internal of a new
+// TokenRetriever to be easily set up
+func NewTokenRetriever(authTransport HTTPAuthTransport) *TokenRetriever {
+       return &TokenRetriever{
+               transport: authTransport,
+       }
+}
+
+// newExchangeCodeRequest builds a new AuthTokenRequest wrapped in an
+// http.Request
+func (ce *TokenRetriever) newExchangeCodeRequest(
+       req AuthorizationCodeExchangeRequest) (*http.Request, error) {
+       uv := url.Values{}
+       uv.Set("grant_type", "authorization_code")
+       uv.Set("client_id", req.ClientID)
+       uv.Set("code_verifier", req.CodeVerifier)
+       uv.Set("code", req.Code)
+       uv.Set("redirect_uri", req.RedirectURI)
+
+       euv := uv.Encode()
+
+       request, err := http.NewRequest("POST",
+               req.TokenEndpoint,
+               strings.NewReader(euv),
+       )
+       if err != nil {
+               return nil, err
+       }
+
+       request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+       request.Header.Add("Content-Length", strconv.Itoa(len(euv)))
+
+       return request, nil
+}
+
+// newDeviceCodeExchangeRequest builds a new DeviceCodeExchangeRequest wrapped 
in an
+// http.Request
+func (ce *TokenRetriever) newDeviceCodeExchangeRequest(
+       req DeviceCodeExchangeRequest) (*http.Request, error) {
+       uv := url.Values{}
+       uv.Set("grant_type", "urn:ietf:params:oauth:grant-type:device_code")
+       uv.Set("client_id", req.ClientID)
+       uv.Set("device_code", req.DeviceCode)
+       euv := uv.Encode()
+
+       request, err := http.NewRequest("POST",
+               req.TokenEndpoint,
+               strings.NewReader(euv),
+       )
+       if err != nil {
+               return nil, err
+       }
+
+       request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+       request.Header.Add("Content-Length", strconv.Itoa(len(euv)))
+
+       return request, nil
+}
+
+// newRefreshTokenRequest builds a new RefreshTokenRequest wrapped in an
+// http.Request
+func (ce *TokenRetriever) newRefreshTokenRequest(req 
RefreshTokenExchangeRequest) (*http.Request, error) {
+       uv := url.Values{}
+       uv.Set("grant_type", "refresh_token")
+       uv.Set("client_id", req.ClientID)
+       uv.Set("refresh_token", req.RefreshToken)
+
+       euv := uv.Encode()
+
+       request, err := http.NewRequest("POST",
+               req.TokenEndpoint,
+               strings.NewReader(euv),
+       )
+       if err != nil {
+               return nil, err
+       }
+
+       request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+       request.Header.Add("Content-Length", strconv.Itoa(len(euv)))
+
+       return request, nil
+}
+
+// newClientCredentialsRequest builds a new ClientCredentialsExchangeRequest 
wrapped in an
+// http.Request
+func (ce *TokenRetriever) newClientCredentialsRequest(req 
ClientCredentialsExchangeRequest) (*http.Request, error) {
+       uv := url.Values{}
+       uv.Set("grant_type", "client_credentials")
+       uv.Set("client_id", req.ClientID)
+       uv.Set("client_secret", req.ClientSecret)
+       uv.Set("audience", req.Audience)
+
+       euv := uv.Encode()
+
+       request, err := http.NewRequest("POST",
+               req.TokenEndpoint,
+               strings.NewReader(euv),
+       )
+       if err != nil {
+               return nil, err
+       }
+
+       request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+       request.Header.Add("Content-Length", strconv.Itoa(len(euv)))
+
+       return request, nil
+}
+
+// ExchangeCode uses the AuthCodeExchangeRequest to exchange an authorization
+// code for tokens
+func (ce *TokenRetriever) ExchangeCode(req AuthorizationCodeExchangeRequest) 
(*TokenResult, error) {
+       request, err := ce.newExchangeCodeRequest(req)
+       if err != nil {
+               return nil, err
+       }
+
+       response, err := ce.transport.Do(request)
+       if err != nil {
+               return nil, err
+       }
+
+       return ce.handleAuthTokensResponse(response)
+}
+
+// handleAuthTokensResponse takes care of checking an http.Response that has
+// auth tokens for errors and parsing the raw body to a TokenResult struct
+func (ce *TokenRetriever) handleAuthTokensResponse(resp *http.Response) 
(*TokenResult, error) {
+       if resp.Body != nil {
+               defer resp.Body.Close()
+       }
+
+       if resp.StatusCode < 200 || resp.StatusCode > 299 {
+               if resp.Header.Get("Content-Type") == "application/json" {
+                       er := TokenErrorResponse{}
+                       err := json.NewDecoder(resp.Body).Decode(&er)
+                       if err != nil {
+                               return nil, err
+                       }
+                       return nil, &TokenError{ErrorCode: er.Error, 
ErrorDescription: er.ErrorDescription}
+               }
+               return nil, fmt.Errorf("a non-success status code was received: 
%d", resp.StatusCode)
+       }
+
+       atr := AuthorizationTokenResponse{}
+       err := json.NewDecoder(resp.Body).Decode(&atr)
+       if err != nil {
+               return nil, err
+       }
+
+       return &TokenResult{
+               AccessToken:  atr.AccessToken,
+               IDToken:      atr.IDToken,
+               RefreshToken: atr.RefreshToken,
+               ExpiresIn:    atr.ExpiresIn,
+       }, nil
+}
+
+// ExchangeDeviceCode uses the DeviceCodeExchangeRequest to exchange a device
+// code for tokens
+func (ce *TokenRetriever) ExchangeDeviceCode(ctx context.Context, req 
DeviceCodeExchangeRequest) (*TokenResult, error) {
+       for {
+               request, err := ce.newDeviceCodeExchangeRequest(req)
+               if err != nil {
+                       return nil, err
+               }
+
+               response, err := ce.transport.Do(request)
+               if err != nil {
+                       return nil, err
+               }
+               token, err := ce.handleAuthTokensResponse(response)
+               if err == nil {
+                       return token, nil
+               }
+               terr, ok := err.(*TokenError)
+               if !ok {
+                       return nil, err
+               }
+               switch terr.ErrorCode {
+               case "expired_token":
+                       // The user has not authorized the device quickly 
enough, so the device_code has expired.
+                       return nil, fmt.Errorf("the device code has expired")
+               case "access_denied":
+                       // The user refused to authorize the device
+                       return nil, fmt.Errorf("the device was not authorized")
+               case "authorization_pending":
+                       // Still waiting for the user to take action
+               case "slow_down":
+                       // You are polling too fast
+               }
+
+               select {
+               case <-time.After(req.PollInterval):
+                       continue
+               case <-ctx.Done():
+                       return nil, errors.New("cancelled")
+               }
+       }
+}
+
+// ExchangeRefreshToken uses the RefreshTokenExchangeRequest to exchange a
+// refresh token for refreshed tokens
+func (ce *TokenRetriever) ExchangeRefreshToken(req 
RefreshTokenExchangeRequest) (*TokenResult, error) {
+       request, err := ce.newRefreshTokenRequest(req)
+       if err != nil {
+               return nil, err
+       }
+
+       response, err := ce.transport.Do(request)
+       if err != nil {
+               return nil, err
+       }
+
+       return ce.handleAuthTokensResponse(response)
+}
+
+// ExchangeClientCredentials uses the ClientCredentialsExchangeRequest to 
exchange
+// client credentials for tokens
+func (ce *TokenRetriever) ExchangeClientCredentials(req 
ClientCredentialsExchangeRequest) (*TokenResult, error) {
+       request, err := ce.newClientCredentialsRequest(req)
+       if err != nil {
+               return nil, err
+       }
+
+       response, err := ce.transport.Do(request)
+       if err != nil {
+               return nil, err
+       }
+
+       return ce.handleAuthTokensResponse(response)
+}
diff --git a/oauth2/authorization_tokenretriever_test.go 
b/oauth2/authorization_tokenretriever_test.go
new file mode 100644
index 0000000..b08f878
--- /dev/null
+++ b/oauth2/authorization_tokenretriever_test.go
@@ -0,0 +1,338 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+       "bytes"
+       "context"
+       "encoding/json"
+       "io/ioutil"
+       "net/http"
+       "strings"
+       "time"
+
+       . "github.com/onsi/ginkgo"
+       . "github.com/onsi/gomega"
+)
+
+type MockTransport struct {
+       Responses   []*http.Response
+       ReturnError error
+}
+
+var _ HTTPAuthTransport = &MockTransport{}
+
+func (t *MockTransport) Do(req *http.Request) (*http.Response, error) {
+       if len(t.Responses) > 0 {
+               r := t.Responses[0]
+               t.Responses = t.Responses[1:]
+               return r, nil
+       }
+       return nil, t.ReturnError
+}
+
+var _ = Describe("CodetokenExchanger", func() {
+       Describe("newExchangeCodeRequest", func() {
+               It("creates the request", func() {
+                       tokenRetriever := TokenRetriever{}
+                       exchangeRequest := AuthorizationCodeExchangeRequest{
+                               TokenEndpoint: "https://issuer/oauth/token";,
+                               ClientID:      "clientID",
+                               CodeVerifier:  "Verifier",
+                               Code:          "code",
+                               RedirectURI:   "https://redirect";,
+                       }
+
+                       result, err := 
tokenRetriever.newExchangeCodeRequest(exchangeRequest)
+
+                       result.ParseForm()
+
+                       Expect(err).To(BeNil())
+                       
Expect(result.FormValue("grant_type")).To(Equal("authorization_code"))
+                       
Expect(result.FormValue("client_id")).To(Equal("clientID"))
+                       
Expect(result.FormValue("code_verifier")).To(Equal("Verifier"))
+                       Expect(result.FormValue("code")).To(Equal("code"))
+                       
Expect(result.FormValue("redirect_uri")).To(Equal("https://redirect";))
+                       
Expect(result.URL.String()).To(Equal("https://issuer/oauth/token";))
+
+                       
Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
+                       
Expect(result.Header.Get("Content-Length")).To(Equal("117"))
+               })
+
+               It("returns an error when NewRequest returns an error", func() {
+                       tokenRetriever := TokenRetriever{}
+
+                       result, err := 
tokenRetriever.newExchangeCodeRequest(AuthorizationCodeExchangeRequest{
+                               TokenEndpoint: "://issuer/oauth/token",
+                       })
+
+                       Expect(result).To(BeNil())
+                       Expect(err.Error()).To(Equal("parse 
://issuer/oauth/token: missing protocol scheme"))
+               })
+       })
+
+       Describe("handleAuthTokensResponse", func() {
+               It("handles the response", func() {
+                       tokenRetriever := TokenRetriever{}
+                       response := buildResponse(200, 
AuthorizationTokenResponse{
+                               ExpiresIn:    1,
+                               AccessToken:  "myAccessToken",
+                               RefreshToken: "myRefreshToken",
+                       })
+
+                       result, err := 
tokenRetriever.handleAuthTokensResponse(response)
+
+                       Expect(err).To(BeNil())
+                       Expect(result).To(Equal(&TokenResult{
+                               ExpiresIn:    1,
+                               AccessToken:  "myAccessToken",
+                               RefreshToken: "myRefreshToken",
+                       }))
+               })
+
+               It("returns error when status code is not successful", func() {
+                       tokenRetriever := TokenRetriever{}
+                       response := buildResponse(500, nil)
+
+                       result, err := 
tokenRetriever.handleAuthTokensResponse(response)
+
+                       Expect(result).To(BeNil())
+                       Expect(err.Error()).To(Equal("a non-success status code 
was received: 500"))
+               })
+
+               It("returns typed error when response body contains error 
information", func() {
+                       errorBody := TokenErrorResponse{Error: "test", 
ErrorDescription: "test description"}
+                       tokenRetriever := TokenRetriever{}
+                       response := buildResponse(400, errorBody)
+
+                       result, err := 
tokenRetriever.handleAuthTokensResponse(response)
+
+                       Expect(result).To(BeNil())
+                       Expect(err).To(Equal(&TokenError{ErrorCode: "test", 
ErrorDescription: "test description"}))
+                       Expect(err.Error()).To(Equal("test description (test)"))
+               })
+
+               It("returns error when deserialization fails", func() {
+                       tokenRetriever := TokenRetriever{}
+                       response := buildResponse(200, "")
+
+                       result, err := 
tokenRetriever.handleAuthTokensResponse(response)
+                       Expect(result).To(BeNil())
+                       Expect(err.Error()).To(Equal(
+                               "json: cannot unmarshal string into Go value of 
type oauth2.AuthorizationTokenResponse"))
+               })
+       })
+
+       Describe("newRefreshTokenRequest", func() {
+               It("creates the request", func() {
+                       tokenRetriever := TokenRetriever{}
+                       exchangeRequest := RefreshTokenExchangeRequest{
+                               TokenEndpoint: "https://issuer/oauth/token";,
+                               ClientID:      "clientID",
+                               RefreshToken:  "refreshToken",
+                       }
+
+                       result, err := 
tokenRetriever.newRefreshTokenRequest(exchangeRequest)
+
+                       result.ParseForm()
+
+                       Expect(err).To(BeNil())
+                       
Expect(result.FormValue("grant_type")).To(Equal("refresh_token"))
+                       
Expect(result.FormValue("client_id")).To(Equal("clientID"))
+                       
Expect(result.FormValue("refresh_token")).To(Equal("refreshToken"))
+                       
Expect(result.URL.String()).To(Equal("https://issuer/oauth/token";))
+
+                       
Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
+                       
Expect(result.Header.Get("Content-Length")).To(Equal("70"))
+               })
+
+               It("returns an error when NewRequest returns an error", func() {
+                       tokenRetriever := TokenRetriever{}
+
+                       result, err := 
tokenRetriever.newRefreshTokenRequest(RefreshTokenExchangeRequest{
+                               TokenEndpoint: "://issuer/oauth/token",
+                       })
+
+                       Expect(result).To(BeNil())
+                       Expect(err.Error()).To(Equal("parse 
://issuer/oauth/token: missing protocol scheme"))
+               })
+       })
+
+       Describe("newClientCredentialsRequest", func() {
+               It("creates the request", func() {
+                       tokenRetriever := TokenRetriever{}
+                       exchangeRequest := ClientCredentialsExchangeRequest{
+                               TokenEndpoint: "https://issuer/oauth/token";,
+                               ClientID:      "clientID",
+                               ClientSecret:  "clientSecret",
+                               Audience:      "audience",
+                       }
+
+                       result, err := 
tokenRetriever.newClientCredentialsRequest(exchangeRequest)
+
+                       result.ParseForm()
+
+                       Expect(err).To(BeNil())
+                       
Expect(result.FormValue("grant_type")).To(Equal("client_credentials"))
+                       
Expect(result.FormValue("client_id")).To(Equal("clientID"))
+                       
Expect(result.FormValue("client_secret")).To(Equal("clientSecret"))
+                       
Expect(result.FormValue("audience")).To(Equal("audience"))
+                       
Expect(result.URL.String()).To(Equal("https://issuer/oauth/token";))
+
+                       
Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
+                       
Expect(result.Header.Get("Content-Length")).To(Equal("93"))
+               })
+
+               It("returns an error when NewRequest returns an error", func() {
+                       tokenRetriever := TokenRetriever{}
+
+                       result, err := 
tokenRetriever.newClientCredentialsRequest(ClientCredentialsExchangeRequest{
+                               TokenEndpoint: "://issuer/oauth/token",
+                       })
+
+                       Expect(result).To(BeNil())
+                       Expect(err.Error()).To(Equal("parse 
://issuer/oauth/token: missing protocol scheme"))
+               })
+       })
+
+       Describe("newDeviceCodeExchangeRequest", func() {
+               It("creates the request", func() {
+                       tokenRetriever := TokenRetriever{}
+                       exchangeRequest := DeviceCodeExchangeRequest{
+                               TokenEndpoint: "https://issuer/oauth/token";,
+                               ClientID:      "clientID",
+                               DeviceCode:    "deviceCode",
+                               PollInterval:  time.Duration(5) * time.Second,
+                       }
+
+                       result, err := 
tokenRetriever.newDeviceCodeExchangeRequest(exchangeRequest)
+
+                       result.ParseForm()
+
+                       Expect(err).To(BeNil())
+                       
Expect(result.FormValue("grant_type")).To(Equal("urn:ietf:params:oauth:grant-type:device_code"))
+                       
Expect(result.FormValue("client_id")).To(Equal("clientID"))
+                       
Expect(result.FormValue("device_code")).To(Equal("deviceCode"))
+                       
Expect(result.URL.String()).To(Equal("https://issuer/oauth/token";))
+
+                       
Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
+                       
Expect(result.Header.Get("Content-Length")).To(Equal("107"))
+               })
+
+               It("returns an error when NewRequest returns an error", func() {
+                       tokenRetriever := TokenRetriever{}
+
+                       result, err := 
tokenRetriever.newClientCredentialsRequest(ClientCredentialsExchangeRequest{
+                               TokenEndpoint: "://issuer/oauth/token",
+                       })
+
+                       Expect(result).To(BeNil())
+                       Expect(err.Error()).To(Equal("parse 
://issuer/oauth/token: missing protocol scheme"))
+               })
+       })
+
+       Describe("ExchangeDeviceCode", func() {
+               var mockTransport *MockTransport
+               var tokenRetriever *TokenRetriever
+               var exchangeRequest DeviceCodeExchangeRequest
+               var tokenResult TokenResult
+
+               BeforeEach(func() {
+                       mockTransport = &MockTransport{}
+                       tokenRetriever = &TokenRetriever{
+                               transport: mockTransport,
+                       }
+                       exchangeRequest = DeviceCodeExchangeRequest{
+                               TokenEndpoint: "https://issuer/oauth/token";,
+                               ClientID:      "clientID",
+                               DeviceCode:    "deviceCode",
+                               PollInterval:  time.Duration(1) * time.Second,
+                       }
+                       tokenResult = TokenResult{
+                               ExpiresIn:    1,
+                               AccessToken:  "myAccessToken",
+                               RefreshToken: "myRefreshToken",
+                       }
+               })
+
+               It("returns a token", func() {
+               })
+
+               It("supports cancellation", func() {
+                       mockTransport.Responses = []*http.Response{
+                               buildResponse(400, 
&TokenErrorResponse{"authorization_pending", ""}),
+                       }
+                       ctx, cancel := context.WithCancel(context.Background())
+                       cancel()
+                       _, err := tokenRetriever.ExchangeDeviceCode(ctx, 
exchangeRequest)
+                       Expect(err).ToNot(BeNil())
+                       Expect(err.Error()).To(Equal("cancelled"))
+               })
+
+               It("implements authorization_pending and slow_down", func() {
+                       startTime := time.Now()
+                       mockTransport.Responses = []*http.Response{
+                               buildResponse(400, 
&TokenErrorResponse{"authorization_pending", ""}),
+                               buildResponse(400, 
&TokenErrorResponse{"authorization_pending", ""}),
+                               buildResponse(400, 
&TokenErrorResponse{"slow_down", ""}),
+                               buildResponse(200, &tokenResult),
+                       }
+                       token, err := 
tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest)
+                       Expect(err).To(BeNil())
+                       Expect(token).To(Equal(&tokenResult))
+                       endTime := time.Now()
+                       Expect(endTime.Sub(startTime)).To(BeNumerically(">", 
exchangeRequest.PollInterval*3))
+               })
+
+               It("implements expired_token", func() {
+                       mockTransport.Responses = []*http.Response{
+                               buildResponse(400, 
&TokenErrorResponse{"expired_token", ""}),
+                       }
+                       _, err := 
tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest)
+                       Expect(err).ToNot(BeNil())
+                       Expect(err.Error()).To(Equal("the device code has 
expired"))
+               })
+
+               It("implements access_denied", func() {
+                       mockTransport.Responses = []*http.Response{
+                               buildResponse(400, 
&TokenErrorResponse{"access_denied", ""}),
+                       }
+                       _, err := 
tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest)
+                       Expect(err).ToNot(BeNil())
+                       Expect(err.Error()).To(Equal("the device was not 
authorized"))
+               })
+       })
+})
+
+func buildResponse(statusCode int, body interface{}) *http.Response {
+       b, err := json.Marshal(body)
+       if err != nil {
+               panic(err)
+       }
+
+       resp := &http.Response{
+               StatusCode: statusCode,
+               Header:     map[string][]string{},
+               Body:       ioutil.NopCloser(bytes.NewReader(b)),
+       }
+       if strings.HasPrefix(string(b), "{") {
+               resp.Header.Add("Content-Type", "application/json")
+       }
+
+       return resp
+}
diff --git a/oauth2/cache/cache.go b/oauth2/cache/cache.go
new file mode 100644
index 0000000..3d90bfa
--- /dev/null
+++ b/oauth2/cache/cache.go
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cache
+
+import (
+       "fmt"
+       "sync"
+       "time"
+
+       "github.com/apache/pulsar-client-go/oauth2"
+       "github.com/apache/pulsar-client-go/oauth2/store"
+
+       xoauth2 "golang.org/x/oauth2"
+       "github.com/apache/pulsar-client-go/oauth2/clock"
+)
+
+// A CachingTokenSource is anything that can return a token, and is backed by 
a cache.
+type CachingTokenSource interface {
+       xoauth2.TokenSource
+
+       // InvalidateToken is called when the token is rejected by the resource 
server.
+       InvalidateToken() error
+}
+
+const (
+       // expiryDelta adjusts the token TTL to avoid using tokens which are 
almost expired
+       expiryDelta = time.Duration(60) * time.Second
+)
+
+// tokenCache implements a cache for the token associated with a specific 
audience.
+// it interacts with the store when the access token is near expiration or 
invalidated.
+// it is advisable to use a token cache instance per audience.
+type tokenCache struct {
+       clock     clock.Clock
+       lock      sync.Mutex
+       store     store.Store
+       audience  string
+       refresher oauth2.AuthorizationGrantRefresher
+       token     *xoauth2.Token
+}
+
+func NewDefaultTokenCache(store store.Store, audience string,
+       refresher oauth2.AuthorizationGrantRefresher) (CachingTokenSource, 
error) {
+       cache := &tokenCache{
+               clock:     clock.RealClock{},
+               store:     store,
+               audience:  audience,
+               refresher: refresher,
+       }
+       return cache, nil
+}
+
+var _ CachingTokenSource = &tokenCache{}
+
+// Token returns a valid access token, if available.
+func (t *tokenCache) Token() (*xoauth2.Token, error) {
+       t.lock.Lock()
+       defer t.lock.Unlock()
+
+       // use the cached access token if it isn't expired
+       if t.token != nil && t.validateAccessToken(*t.token) {
+               return t.token, nil
+       }
+
+       // load from the store and use the access token if it isn't expired
+       grant, err := t.store.LoadGrant(t.audience)
+       if err != nil {
+               return nil, fmt.Errorf("LoadGrant: %v", err)
+       }
+       t.token = grant.Token
+       if t.token != nil && t.validateAccessToken(*t.token) {
+               return t.token, nil
+       }
+
+       // obtain and cache a fresh access token
+       grant, err = t.refresher.Refresh(grant)
+       if err != nil {
+               return nil, fmt.Errorf("RefreshGrant: %v", err)
+       }
+       t.token = grant.Token
+       err = t.store.SaveGrant(t.audience, *grant)
+       if err != nil {
+               // TODO log rather than throw
+               return nil, fmt.Errorf("SaveGrant: %v", err)
+       }
+
+       return t.token, nil
+}
+
+// InvalidateToken clears the access token (likely due to a response from the 
resource server).
+// Note that the token within the grant may contain a refresh token which 
should survive.
+func (t *tokenCache) InvalidateToken() error {
+       t.lock.Lock()
+       defer t.lock.Unlock()
+
+       previous := t.token
+       t.token = nil
+
+       // clear from the store the access token that was returned earlier 
(unless the store has since been updated)
+       if previous == nil || previous.AccessToken == "" {
+               return nil
+       }
+       grant, err := t.store.LoadGrant(t.audience)
+       if err != nil {
+               return fmt.Errorf("LoadGrant: %v", err)
+       }
+       if grant.Token != nil && grant.Token.AccessToken == 
previous.AccessToken {
+               grant.Token.Expiry = time.Unix(0, 0).Add(expiryDelta)
+               err = t.store.SaveGrant(t.audience, *grant)
+               if err != nil {
+                       // TODO log rather than throw
+                       return fmt.Errorf("SaveGrant: %v", err)
+               }
+       }
+       return nil
+}
+
+// validateAccessToken checks the validity of the cached access token
+func (t *tokenCache) validateAccessToken(token xoauth2.Token) bool {
+       if token.AccessToken == "" {
+               return false
+       }
+       if !token.Expiry.IsZero() && 
t.clock.Now().After(token.Expiry.Round(0).Add(-expiryDelta)) {
+               return false
+       }
+       return true
+}
diff --git a/oauth2/client_credentials_flow.go 
b/oauth2/client_credentials_flow.go
new file mode 100644
index 0000000..808b09b
--- /dev/null
+++ b/oauth2/client_credentials_flow.go
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+       "net/http"
+
+       "github.com/apache/pulsar-client-go/oauth2/clock"
+
+       "github.com/pkg/errors"
+)
+
+// ClientCredentialsFlow takes care of the mechanics needed for getting an 
access
+// token using the OAuth 2.0 "Client Credentials Flow"
+type ClientCredentialsFlow struct {
+       options                ClientCredentialsFlowOptions
+       oidcWellKnownEndpoints OIDCWellKnownEndpoints
+       keyfile                *KeyFile
+       exchanger              ClientCredentialsExchanger
+       clock                  clock.Clock
+}
+
+// ClientCredentialsProvider abstracts getting client credentials
+type ClientCredentialsProvider interface {
+       GetClientCredentials() (*KeyFile, error)
+}
+
+// ClientCredentialsExchanger abstracts exchanging client credentials for 
tokens
+type ClientCredentialsExchanger interface {
+       ExchangeClientCredentials(req ClientCredentialsExchangeRequest) 
(*TokenResult, error)
+}
+
+type ClientCredentialsFlowOptions struct {
+       KeyFile          string
+       AdditionalScopes []string
+}
+
+func newClientCredentialsFlow(
+       options ClientCredentialsFlowOptions,
+       keyfile *KeyFile,
+       oidcWellKnownEndpoints OIDCWellKnownEndpoints,
+       exchanger ClientCredentialsExchanger,
+       clock clock.Clock) *ClientCredentialsFlow {
+       return &ClientCredentialsFlow{
+               options:                options,
+               oidcWellKnownEndpoints: oidcWellKnownEndpoints,
+               keyfile:                keyfile,
+               exchanger:              exchanger,
+               clock:                  clock,
+       }
+}
+
+// NewDefaultClientCredentialsFlow provides an easy way to build up a default
+// client credentials flow with all the correct configuration.
+func NewDefaultClientCredentialsFlow(options ClientCredentialsFlowOptions) 
(*ClientCredentialsFlow, error) {
+
+       credsProvider := 
NewClientCredentialsProviderFromKeyFile(options.KeyFile)
+       keyFile, err := credsProvider.GetClientCredentials()
+       if err != nil {
+               return nil, errors.Wrap(err, "could not get client credentials")
+       }
+
+       wellKnownEndpoints, err := 
GetOIDCWellKnownEndpointsFromIssuerURL(keyFile.IssuerURL)
+       if err != nil {
+               return nil, err
+       }
+
+       tokenRetriever := NewTokenRetriever(&http.Client{})
+
+       return newClientCredentialsFlow(
+               options,
+               keyFile,
+               *wellKnownEndpoints,
+               tokenRetriever,
+               clock.RealClock{}), nil
+}
+
+var _ Flow = &ClientCredentialsFlow{}
+
+func (c *ClientCredentialsFlow) Authorize(audience string) 
(*AuthorizationGrant, error) {
+       var err error
+       grant := &AuthorizationGrant{
+               Type:              GrantTypeClientCredentials,
+               Audience:          audience,
+               ClientID:          c.keyfile.ClientID,
+               ClientCredentials: c.keyfile,
+               TokenEndpoint:     c.oidcWellKnownEndpoints.TokenEndpoint,
+       }
+
+       // test the credentials and obtain an initial access token
+       refresher := &ClientCredentialsGrantRefresher{
+               exchanger: c.exchanger,
+               clock:     c.clock,
+       }
+       grant, err = refresher.Refresh(grant)
+       if err != nil {
+               return nil, errors.Wrap(err, "authentication failed using 
client credentials")
+       }
+       return grant, nil
+}
+
+type ClientCredentialsGrantRefresher struct {
+       exchanger ClientCredentialsExchanger
+       clock     clock.Clock
+}
+
+func NewDefaultClientCredentialsGrantRefresher(clock clock.Clock) 
(*ClientCredentialsGrantRefresher, error) {
+       tokenRetriever := NewTokenRetriever(&http.Client{})
+       return &ClientCredentialsGrantRefresher{
+               exchanger: tokenRetriever,
+               clock:     clock,
+       }, nil
+}
+
+var _ AuthorizationGrantRefresher = &ClientCredentialsGrantRefresher{}
+
+func (g *ClientCredentialsGrantRefresher) Refresh(grant *AuthorizationGrant) 
(*AuthorizationGrant, error) {
+       if grant.Type != GrantTypeClientCredentials {
+               return nil, errors.New("unsupported grant type")
+       }
+
+       exchangeRequest := ClientCredentialsExchangeRequest{
+               TokenEndpoint: grant.TokenEndpoint,
+               Audience:      grant.Audience,
+               ClientID:      grant.ClientCredentials.ClientID,
+               ClientSecret:  grant.ClientCredentials.ClientSecret,
+       }
+       tr, err := g.exchanger.ExchangeClientCredentials(exchangeRequest)
+       if err != nil {
+               return nil, errors.Wrap(err, "could not exchange client 
credentials")
+       }
+
+       token := convertToOAuth2Token(tr, g.clock)
+       grant = &AuthorizationGrant{
+               Type:              GrantTypeClientCredentials,
+               Audience:          grant.Audience,
+               ClientID:          grant.ClientID,
+               ClientCredentials: grant.ClientCredentials,
+               TokenEndpoint:     grant.TokenEndpoint,
+               Token:             &token,
+       }
+       return grant, nil
+}
diff --git a/oauth2/client_credentials_flow_test.go 
b/oauth2/client_credentials_flow_test.go
new file mode 100644
index 0000000..6e123ce
--- /dev/null
+++ b/oauth2/client_credentials_flow_test.go
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+       "errors"
+       "time"
+
+       "github.com/apache/pulsar-client-go/oauth2/clock"
+       "github.com/apache/pulsar-client-go/oauth2/clock/testing"
+
+       . "github.com/onsi/ginkgo"
+       . "github.com/onsi/gomega"
+)
+
+type MockClientCredentialsProvider struct {
+       Called                  bool
+       ClientCredentialsResult *KeyFile
+       ReturnsError            error
+}
+
+func (m *MockClientCredentialsProvider) GetClientCredentials() (*KeyFile, 
error) {
+       m.Called = true
+       return m.ClientCredentialsResult, m.ReturnsError
+}
+
+var _ ClientCredentialsProvider = &MockClientCredentialsProvider{}
+
+var clientCredentials = KeyFile{
+       Type:         KeyFileTypeServiceAccount,
+       ClientID:     "test_clientID",
+       ClientSecret: "test_clientSecret",
+       ClientEmail:  "test_clientEmail",
+       IssuerURL:    "http://issuer";,
+}
+
+var _ = Describe("ClientCredentialsFlow", func() {
+       Describe("Authorize", func() {
+
+               var mockClock clock.Clock
+               var mockTokenExchanger *MockTokenExchanger
+
+               BeforeEach(func() {
+                       mockClock = testing.NewFakeClock(time.Unix(0, 0))
+                       expectedTokens := TokenResult{AccessToken: 
"accessToken", RefreshToken: "refreshToken", ExpiresIn: 1234}
+                       mockTokenExchanger = &MockTokenExchanger{
+                               ReturnsTokens: &expectedTokens,
+                       }
+               })
+
+               It("invokes TokenExchanger with credentials", func() {
+                       provider := newClientCredentialsFlow(
+                               ClientCredentialsFlowOptions{
+                                       KeyFile: "test_keyfile",
+                               },
+                               &clientCredentials,
+                               oidcEndpoints,
+                               mockTokenExchanger,
+                               mockClock,
+                       )
+
+                       _, err := provider.Authorize("test_audience")
+                       Expect(err).ToNot(HaveOccurred())
+                       
Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&ClientCredentialsExchangeRequest{
+                               TokenEndpoint: oidcEndpoints.TokenEndpoint,
+                               ClientID:      clientCredentials.ClientID,
+                               ClientSecret:  clientCredentials.ClientSecret,
+                               Audience:      "test_audience",
+                       }))
+               })
+
+               It("returns TokensResult from TokenExchanger", func() {
+                       provider := newClientCredentialsFlow(
+                               ClientCredentialsFlowOptions{
+                                       KeyFile: "test_keyfile",
+                               },
+                               &clientCredentials,
+                               oidcEndpoints,
+                               mockTokenExchanger,
+                               mockClock,
+                       )
+
+                       grant, err := provider.Authorize("test_audience")
+                       Expect(err).ToNot(HaveOccurred())
+                       expected := 
convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
+                       Expect(*grant.Token).To(Equal(expected))
+               })
+
+               It("returns an error if token exchanger errors", func() {
+                       mockTokenExchanger.ReturnsError = 
errors.New("someerror")
+                       mockTokenExchanger.ReturnsTokens = nil
+
+                       provider := newClientCredentialsFlow(
+                               ClientCredentialsFlowOptions{
+                                       KeyFile: "test_keyfile",
+                               },
+                               &clientCredentials,
+                               oidcEndpoints,
+                               mockTokenExchanger,
+                               mockClock,
+                       )
+
+                       _, err := provider.Authorize("test_audience")
+                       Expect(err.Error()).To(Equal("authentication failed 
using client credentials: " +
+                               "could not exchange client credentials: 
someerror"))
+               })
+       })
+})
+
+var _ = Describe("ClientCredentialsGrantRefresher", func() {
+
+       Describe("Refresh", func() {
+               var mockClock clock.Clock
+               var mockTokenExchanger *MockTokenExchanger
+
+               BeforeEach(func() {
+                       mockClock = testing.NewFakeClock(time.Unix(0, 0))
+                       expectedTokens := TokenResult{AccessToken: 
"accessToken", RefreshToken: "refreshToken", ExpiresIn: 1234}
+                       mockTokenExchanger = &MockTokenExchanger{
+                               ReturnsTokens: &expectedTokens,
+                       }
+               })
+
+               It("invokes TokenExchanger with credentials", func() {
+                       refresher := &ClientCredentialsGrantRefresher{
+                               clock:     mockClock,
+                               exchanger: mockTokenExchanger,
+                       }
+                       og := &AuthorizationGrant{
+                               Type:              GrantTypeClientCredentials,
+                               Audience:          "test_audience",
+                               ClientCredentials: &clientCredentials,
+                               TokenEndpoint:     oidcEndpoints.TokenEndpoint,
+                               Token:             nil,
+                       }
+                       _, err := refresher.Refresh(og)
+                       Expect(err).ToNot(HaveOccurred())
+                       
Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&ClientCredentialsExchangeRequest{
+                               TokenEndpoint: oidcEndpoints.TokenEndpoint,
+                               ClientID:      clientCredentials.ClientID,
+                               ClientSecret:  clientCredentials.ClientSecret,
+                               Audience:      og.Audience,
+                       }))
+               })
+
+               It("returns a valid grant", func() {
+                       refresher := &ClientCredentialsGrantRefresher{
+                               clock:     mockClock,
+                               exchanger: mockTokenExchanger,
+                       }
+                       og := &AuthorizationGrant{
+                               Type:              GrantTypeClientCredentials,
+                               Audience:          "test_audience",
+                               ClientCredentials: &clientCredentials,
+                               TokenEndpoint:     oidcEndpoints.TokenEndpoint,
+                               Token:             nil,
+                       }
+                       ng, err := refresher.Refresh(og)
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(ng.Audience).To(Equal("test_audience"))
+                       Expect(ng.ClientID).To(Equal(""))
+                       
Expect(*ng.ClientCredentials).To(Equal(clientCredentials))
+                       
Expect(ng.TokenEndpoint).To(Equal(oidcEndpoints.TokenEndpoint))
+                       expected := 
convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
+                       Expect(*ng.Token).To(Equal(expected))
+               })
+       })
+})
diff --git a/oauth2/client_credentials_provider.go 
b/oauth2/client_credentials_provider.go
new file mode 100644
index 0000000..2716376
--- /dev/null
+++ b/oauth2/client_credentials_provider.go
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+       "encoding/json"
+       "fmt"
+       "io/ioutil"
+)
+
+const (
+       KeyFileTypeServiceAccount = "sn_service_account"
+)
+
+type KeyFileProvider struct {
+       KeyFile string
+}
+
+type KeyFile struct {
+       Type         string `json:"type"`
+       ClientID     string `json:"client_id"`
+       ClientSecret string `json:"client_secret"`
+       ClientEmail  string `json:"client_email"`
+       IssuerURL    string `json:"issuer_url"`
+}
+
+func NewClientCredentialsProviderFromKeyFile(keyFile string) *KeyFileProvider {
+       return &KeyFileProvider{
+               KeyFile: keyFile,
+       }
+}
+
+var _ ClientCredentialsProvider = &KeyFileProvider{}
+
+func (k *KeyFileProvider) GetClientCredentials() (*KeyFile, error) {
+       keyFile, err := ioutil.ReadFile(k.KeyFile)
+       if err != nil {
+               return nil, err
+       }
+
+       var v KeyFile
+       err = json.Unmarshal(keyFile, &v)
+       if err != nil {
+               return nil, err
+       }
+       if v.Type != KeyFileTypeServiceAccount {
+               return nil, fmt.Errorf("open %s: unsupported format", k.KeyFile)
+       }
+
+       return &v, nil
+}
diff --git a/oauth2/clock/clock.go b/oauth2/clock/clock.go
new file mode 100644
index 0000000..f170d9a
--- /dev/null
+++ b/oauth2/clock/clock.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package clock
+
+import "time"
+
+// Clock allows for injecting fake or real clocks into code that
+// needs to do arbitrary things based on time.
+type Clock interface {
+       Now() time.Time
+       Since(time.Time) time.Duration
+       After(d time.Duration) <-chan time.Time
+       NewTimer(d time.Duration) Timer
+       Sleep(d time.Duration)
+       Tick(d time.Duration) <-chan time.Time
+}
+
+var _ = Clock(RealClock{})
+
+// RealClock really calls time.Now()
+type RealClock struct{}
+
+// Now returns the current time.
+func (RealClock) Now() time.Time {
+       return time.Now()
+}
+
+// Since returns time since the specified timestamp.
+func (RealClock) Since(ts time.Time) time.Duration {
+       return time.Since(ts)
+}
+
+// After is the same as time.After(d).
+func (RealClock) After(d time.Duration) <-chan time.Time {
+       return time.After(d)
+}
+
+// NewTimer is the same as time.NewTimer(d)
+func (RealClock) NewTimer(d time.Duration) Timer {
+       return &realTimer{
+               timer: time.NewTimer(d),
+       }
+}
+
+// Tick is the same as time.Tick(d)
+func (RealClock) Tick(d time.Duration) <-chan time.Time {
+       return time.Tick(d)
+}
+
+// Sleep is the same as time.Sleep(d)
+func (RealClock) Sleep(d time.Duration) {
+       time.Sleep(d)
+}
+
+// Timer allows for injecting fake or real timers into code that
+// needs to do arbitrary things based on time.
+type Timer interface {
+       C() <-chan time.Time
+       Stop() bool
+       Reset(d time.Duration) bool
+}
+
+var _ = Timer(&realTimer{})
+
+// realTimer is backed by an actual time.Timer.
+type realTimer struct {
+       timer *time.Timer
+}
+
+// C returns the underlying timer's channel.
+func (r *realTimer) C() <-chan time.Time {
+       return r.timer.C
+}
+
+// Stop calls Stop() on the underlying timer.
+func (r *realTimer) Stop() bool {
+       return r.timer.Stop()
+}
+
+// Reset calls Reset() on the underlying timer.
+func (r *realTimer) Reset(d time.Duration) bool {
+       return r.timer.Reset(d)
+}
diff --git a/oauth2/clock/testing/fake_clock.go 
b/oauth2/clock/testing/fake_clock.go
new file mode 100644
index 0000000..6fcbf4c
--- /dev/null
+++ b/oauth2/clock/testing/fake_clock.go
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package testing
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/pulsar-client-go/oauth2/clock"
+)
+
+var (
+       _ = clock.Clock(&FakeClock{})
+       _ = clock.Clock(&IntervalClock{})
+)
+
+// FakeClock implements clock.Clock, but returns an arbitrary time.
+type FakeClock struct {
+       lock sync.RWMutex
+       time time.Time
+
+       // waiters are waiting for the fake time to pass their specified time
+       waiters []*fakeClockWaiter
+}
+
+type fakeClockWaiter struct {
+       targetTime    time.Time
+       stepInterval  time.Duration
+       skipIfBlocked bool
+       destChan      chan time.Time
+       fired         bool
+}
+
+// NewFakeClock constructs a fake clock set to the provided time.
+func NewFakeClock(t time.Time) *FakeClock {
+       return &FakeClock{
+               time: t,
+       }
+}
+
+// Now returns f's time.
+func (f *FakeClock) Now() time.Time {
+       f.lock.RLock()
+       defer f.lock.RUnlock()
+       return f.time
+}
+
+// Since returns time since the time in f.
+func (f *FakeClock) Since(ts time.Time) time.Duration {
+       f.lock.RLock()
+       defer f.lock.RUnlock()
+       return f.time.Sub(ts)
+}
+
+// After is the fake version of time.After(d).
+func (f *FakeClock) After(d time.Duration) <-chan time.Time {
+       f.lock.Lock()
+       defer f.lock.Unlock()
+       stopTime := f.time.Add(d)
+       ch := make(chan time.Time, 1) // Don't block!
+       f.waiters = append(f.waiters, &fakeClockWaiter{
+               targetTime: stopTime,
+               destChan:   ch,
+       })
+       return ch
+}
+
+// NewTimer constructs a fake timer, akin to time.NewTimer(d).
+func (f *FakeClock) NewTimer(d time.Duration) clock.Timer {
+       f.lock.Lock()
+       defer f.lock.Unlock()
+       stopTime := f.time.Add(d)
+       ch := make(chan time.Time, 1) // Don't block!
+       timer := &fakeTimer{
+               fakeClock: f,
+               waiter: fakeClockWaiter{
+                       targetTime: stopTime,
+                       destChan:   ch,
+               },
+       }
+       f.waiters = append(f.waiters, &timer.waiter)
+       return timer
+}
+
+// Tick constructs a fake ticker, akin to time.Tick
+func (f *FakeClock) Tick(d time.Duration) <-chan time.Time {
+       if d <= 0 {
+               return nil
+       }
+       f.lock.Lock()
+       defer f.lock.Unlock()
+       tickTime := f.time.Add(d)
+       ch := make(chan time.Time, 1) // hold one tick
+       f.waiters = append(f.waiters, &fakeClockWaiter{
+               targetTime:    tickTime,
+               stepInterval:  d,
+               skipIfBlocked: true,
+               destChan:      ch,
+       })
+
+       return ch
+}
+
+// Step moves the clock by Duration and notifies anyone that's called After,
+// Tick, or NewTimer.
+func (f *FakeClock) Step(d time.Duration) {
+       f.lock.Lock()
+       defer f.lock.Unlock()
+       f.setTimeLocked(f.time.Add(d))
+}
+
+// SetTime sets the time.
+func (f *FakeClock) SetTime(t time.Time) {
+       f.lock.Lock()
+       defer f.lock.Unlock()
+       f.setTimeLocked(t)
+}
+
+// Actually changes the time and checks any waiters. f must be write-locked.
+func (f *FakeClock) setTimeLocked(t time.Time) {
+       f.time = t
+       newWaiters := make([]*fakeClockWaiter, 0, len(f.waiters))
+       for i := range f.waiters {
+               w := f.waiters[i]
+               if !w.targetTime.After(t) {
+
+                       if w.skipIfBlocked {
+                               select {
+                               case w.destChan <- t:
+                                       w.fired = true
+                               default:
+                               }
+                       } else {
+                               w.destChan <- t
+                               w.fired = true
+                       }
+
+                       if w.stepInterval > 0 {
+                               for !w.targetTime.After(t) {
+                                       w.targetTime = 
w.targetTime.Add(w.stepInterval)
+                               }
+                               newWaiters = append(newWaiters, w)
+                       }
+
+               } else {
+                       newWaiters = append(newWaiters, f.waiters[i])
+               }
+       }
+       f.waiters = newWaiters
+}
+
+// HasWaiters returns true if After has been called on f but not yet satisfied 
(so you can
+// write race-free tests).
+func (f *FakeClock) HasWaiters() bool {
+       f.lock.RLock()
+       defer f.lock.RUnlock()
+       return len(f.waiters) > 0
+}
+
+// Sleep is akin to time.Sleep
+func (f *FakeClock) Sleep(d time.Duration) {
+       f.Step(d)
+}
+
+// IntervalClock implements clock.Clock, but each invocation of Now steps the 
clock forward the specified duration
+type IntervalClock struct {
+       Time     time.Time
+       Duration time.Duration
+}
+
+// Now returns i's time.
+func (i *IntervalClock) Now() time.Time {
+       i.Time = i.Time.Add(i.Duration)
+       return i.Time
+}
+
+// Since returns time since the time in i.
+func (i *IntervalClock) Since(ts time.Time) time.Duration {
+       return i.Time.Sub(ts)
+}
+
+// After is unimplemented, will panic.
+// TODO: make interval clock use FakeClock so this can be implemented.
+func (*IntervalClock) After(d time.Duration) <-chan time.Time {
+       panic("IntervalClock doesn't implement After")
+}
+
+// NewTimer is unimplemented, will panic.
+// TODO: make interval clock use FakeClock so this can be implemented.
+func (*IntervalClock) NewTimer(d time.Duration) clock.Timer {
+       panic("IntervalClock doesn't implement NewTimer")
+}
+
+// Tick is unimplemented, will panic.
+// TODO: make interval clock use FakeClock so this can be implemented.
+func (*IntervalClock) Tick(d time.Duration) <-chan time.Time {
+       panic("IntervalClock doesn't implement Tick")
+}
+
+// Sleep is unimplemented, will panic.
+func (*IntervalClock) Sleep(d time.Duration) {
+       panic("IntervalClock doesn't implement Sleep")
+}
+
+var _ = clock.Timer(&fakeTimer{})
+
+// fakeTimer implements clock.Timer based on a FakeClock.
+type fakeTimer struct {
+       fakeClock *FakeClock
+       waiter    fakeClockWaiter
+}
+
+// C returns the channel that notifies when this timer has fired.
+func (f *fakeTimer) C() <-chan time.Time {
+       return f.waiter.destChan
+}
+
+// Stop stops the timer and returns true if the timer has not yet fired, or 
false otherwise.
+func (f *fakeTimer) Stop() bool {
+       f.fakeClock.lock.Lock()
+       defer f.fakeClock.lock.Unlock()
+
+       newWaiters := make([]*fakeClockWaiter, 0, len(f.fakeClock.waiters))
+       for i := range f.fakeClock.waiters {
+               w := f.fakeClock.waiters[i]
+               if w != &f.waiter {
+                       newWaiters = append(newWaiters, w)
+               }
+       }
+
+       f.fakeClock.waiters = newWaiters
+
+       return !f.waiter.fired
+}
+
+// Reset resets the timer to the fake clock's "now" + d. It returns true if 
the timer has not yet
+// fired, or false otherwise.
+func (f *fakeTimer) Reset(d time.Duration) bool {
+       f.fakeClock.lock.Lock()
+       defer f.fakeClock.lock.Unlock()
+
+       active := !f.waiter.fired
+
+       f.waiter.fired = false
+       f.waiter.targetTime = f.fakeClock.time.Add(d)
+
+       var isWaiting bool
+       for i := range f.fakeClock.waiters {
+               w := f.fakeClock.waiters[i]
+               if w == &f.waiter {
+                       isWaiting = true
+                       break
+               }
+       }
+       if !isWaiting {
+               f.fakeClock.waiters = append(f.fakeClock.waiters, &f.waiter)
+       }
+
+       return active
+}
diff --git a/oauth2/config_tokenprovider.go b/oauth2/config_tokenprovider.go
new file mode 100644
index 0000000..627749f
--- /dev/null
+++ b/oauth2/config_tokenprovider.go
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import "fmt"
+
+type configProvider interface {
+       GetTokens(identifier string) (string, string)
+       SaveTokens(identifier, accessToken, refreshToken string)
+}
+
+// ConfigBackedCachingProvider wraps a configProvider in order to conform to
+// the cachingProvider interface
+type ConfigBackedCachingProvider struct {
+       identifier string
+       config     configProvider
+}
+
+// NewConfigBackedCachingProvider builds and returns a CachingTokenProvider
+// that utilizes a configProvider to cache tokens
+func NewConfigBackedCachingProvider(clientID, audience string, config 
configProvider) *ConfigBackedCachingProvider {
+       return &ConfigBackedCachingProvider{
+               identifier: fmt.Sprintf("%s-%s", clientID, audience),
+               config:     config,
+       }
+}
+
+// GetTokens gets the tokens from the cache and returns them as a TokenResult
+func (c *ConfigBackedCachingProvider) GetTokens() (*TokenResult, error) {
+       accessToken, refreshToken := c.config.GetTokens(c.identifier)
+       return &TokenResult{
+               AccessToken:  accessToken,
+               RefreshToken: refreshToken,
+       }, nil
+}
+
+// CacheTokens caches the id and refresh token from TokenResult in the
+// configProvider
+func (c *ConfigBackedCachingProvider) CacheTokens(toCache *TokenResult) error {
+       c.config.SaveTokens(c.identifier, toCache.AccessToken, 
toCache.RefreshToken)
+       return nil
+}
diff --git a/oauth2/config_tokenprovider_test.go 
b/oauth2/config_tokenprovider_test.go
new file mode 100644
index 0000000..d949a5a
--- /dev/null
+++ b/oauth2/config_tokenprovider_test.go
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+       . "github.com/onsi/ginkgo"
+       . "github.com/onsi/gomega"
+)
+
+type mockConfigProvider struct {
+       ReturnAccessToken         string
+       ReturnRefreshToken        string
+       GetTokensCalledIdentifier string
+       SavedIdentifier           string
+       SavedAccessToken          string
+       SavedRefreshToken         string
+}
+
+func (m *mockConfigProvider) GetTokens(identifier string) (string, string) {
+       m.GetTokensCalledIdentifier = identifier
+       return m.ReturnAccessToken, m.ReturnRefreshToken
+}
+
+func (m *mockConfigProvider) SaveTokens(identifier, accessToken, refreshToken 
string) {
+       m.SavedIdentifier = identifier
+       m.SavedAccessToken = accessToken
+       m.SavedRefreshToken = refreshToken
+}
+
+var _ = Describe("main", func() {
+       Describe("configCachingProvider", func() {
+               It("sets up the identifier using the clientID and audience", 
func() {
+                       p := NewConfigBackedCachingProvider("iamclientid", 
"iamaudience", &mockConfigProvider{})
+
+                       
Expect(p.identifier).To(Equal("iamclientid-iamaudience"))
+               })
+
+               It("gets tokens from the config provider", func() {
+                       c := &mockConfigProvider{
+                               ReturnAccessToken:  "accessToken",
+                               ReturnRefreshToken: "refreshToken",
+                       }
+                       p := ConfigBackedCachingProvider{
+                               identifier: "iamidentifier",
+                               config:     c,
+                       }
+
+                       r, err := p.GetTokens()
+
+                       Expect(err).NotTo(HaveOccurred())
+                       
Expect(c.GetTokensCalledIdentifier).To(Equal(p.identifier))
+                       Expect(r).To(Equal(&TokenResult{
+                               AccessToken:  c.ReturnAccessToken,
+                               RefreshToken: c.ReturnRefreshToken,
+                       }))
+               })
+
+               It("caches the tokens in the config provider", func() {
+                       c := &mockConfigProvider{}
+                       p := ConfigBackedCachingProvider{
+                               identifier: "iamidentifier",
+                               config:     c,
+                       }
+                       toSave := &TokenResult{
+                               AccessToken:  "accessToken",
+                               RefreshToken: "refreshToken",
+                       }
+
+                       p.CacheTokens(toSave)
+
+                       Expect(c.SavedIdentifier).To(Equal(p.identifier))
+                       Expect(c.SavedAccessToken).To(Equal(toSave.AccessToken))
+                       
Expect(c.SavedRefreshToken).To(Equal(toSave.RefreshToken))
+               })
+       })
+})
diff --git a/oauth2/device_code_flow.go b/oauth2/device_code_flow.go
new file mode 100644
index 0000000..486fdfa
--- /dev/null
+++ b/oauth2/device_code_flow.go
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+       "context"
+       "fmt"
+       "net/http"
+       "time"
+
+       "github.com/apache/pulsar-client-go/oauth2/clock"
+
+       "github.com/pkg/errors"
+)
+
+// DeviceCodeFlow takes care of the mechanics needed for getting an access
+// token using the OAuth 2.0 "Device Code Flow"
+type DeviceCodeFlow struct {
+       options                DeviceCodeFlowOptions
+       oidcWellKnownEndpoints OIDCWellKnownEndpoints
+       codeProvider           DeviceCodeProvider
+       exchanger              DeviceTokenExchanger
+       callback               DeviceCodeCallback
+       clock                  clock.Clock
+}
+
+// AuthorizationCodeProvider abstracts getting an authorization code
+type DeviceCodeProvider interface {
+       GetCode(audience string, additionalScopes ...string) 
(*DeviceCodeResult, error)
+}
+
+// DeviceTokenExchanger abstracts exchanging for tokens
+type DeviceTokenExchanger interface {
+       ExchangeDeviceCode(ctx context.Context, req DeviceCodeExchangeRequest) 
(*TokenResult, error)
+       ExchangeRefreshToken(req RefreshTokenExchangeRequest) (*TokenResult, 
error)
+}
+
+type DeviceCodeCallback func(code *DeviceCodeResult) error
+
+type DeviceCodeFlowOptions struct {
+       IssuerEndpoint   string
+       ClientID         string
+       AdditionalScopes []string
+       AllowRefresh     bool
+}
+
+func newDeviceCodeFlow(
+       options DeviceCodeFlowOptions,
+       oidcWellKnownEndpoints OIDCWellKnownEndpoints,
+       codeProvider DeviceCodeProvider,
+       exchanger DeviceTokenExchanger,
+       callback DeviceCodeCallback,
+       clock clock.Clock) *DeviceCodeFlow {
+       return &DeviceCodeFlow{
+               options:                options,
+               oidcWellKnownEndpoints: oidcWellKnownEndpoints,
+               codeProvider:           codeProvider,
+               exchanger:              exchanger,
+               callback:               callback,
+               clock:                  clock,
+       }
+}
+
+// NewDefaultDeviceCodeFlow provides an easy way to build up a default
+// device code flow with all the correct configuration. If refresh tokens 
should
+// be allowed pass in true for <allowRefresh>
+func NewDefaultDeviceCodeFlow(options DeviceCodeFlowOptions,
+       callback DeviceCodeCallback) (*DeviceCodeFlow, error) {
+       wellKnownEndpoints, err := 
GetOIDCWellKnownEndpointsFromIssuerURL(options.IssuerEndpoint)
+       if err != nil {
+               return nil, err
+       }
+
+       codeProvider := NewLocalDeviceCodeProvider(
+               LocalDeviceCodeProviderOptions{
+                       ClientID: options.ClientID,
+               },
+               *wellKnownEndpoints,
+               &http.Client{},
+       )
+
+       tokenRetriever := NewTokenRetriever(&http.Client{})
+
+       return newDeviceCodeFlow(
+               options,
+               *wellKnownEndpoints,
+               codeProvider,
+               tokenRetriever,
+               callback,
+               clock.RealClock{}), nil
+}
+
+var _ Flow = &DeviceCodeFlow{}
+
+func (p *DeviceCodeFlow) Authorize(audience string) (*AuthorizationGrant, 
error) {
+
+       var additionalScopes []string
+       additionalScopes = append(additionalScopes, 
p.options.AdditionalScopes...)
+       if p.options.AllowRefresh {
+               additionalScopes = append(additionalScopes, "offline_access")
+       }
+
+       codeResult, err := p.codeProvider.GetCode(audience, additionalScopes...)
+       if err != nil {
+               return nil, err
+       }
+
+       if p.callback != nil {
+               err := p.callback(codeResult)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       exchangeRequest := DeviceCodeExchangeRequest{
+               TokenEndpoint: p.oidcWellKnownEndpoints.TokenEndpoint,
+               ClientID:      p.options.ClientID,
+               DeviceCode:    codeResult.DeviceCode,
+               PollInterval:  time.Duration(codeResult.Interval) * time.Second,
+       }
+
+       tr, err := p.exchanger.ExchangeDeviceCode(context.Background(), 
exchangeRequest)
+       if err != nil {
+               return nil, errors.Wrap(err, "could not exchange code")
+       }
+
+       token := convertToOAuth2Token(tr, p.clock)
+       grant := &AuthorizationGrant{
+               Type:          GrantTypeDeviceCode,
+               Audience:      audience,
+               ClientID:      p.options.ClientID,
+               TokenEndpoint: p.oidcWellKnownEndpoints.TokenEndpoint,
+               Token:         &token,
+       }
+       return grant, nil
+}
+
+type DeviceAuthorizationGrantRefresher struct {
+       exchanger DeviceTokenExchanger
+       clock     clock.Clock
+}
+
+// NewDefaultDeviceAuthorizationGrantRefresher constructs a grant refresher 
based on the result
+// of the device authorization flow.
+func NewDefaultDeviceAuthorizationGrantRefresher(clock clock.Clock) 
(*DeviceAuthorizationGrantRefresher, error) {
+       tokenRetriever := NewTokenRetriever(&http.Client{})
+       return &DeviceAuthorizationGrantRefresher{
+               exchanger: tokenRetriever,
+               clock:     clock,
+       }, nil
+}
+
+var _ AuthorizationGrantRefresher = &DeviceAuthorizationGrantRefresher{}
+
+func (g *DeviceAuthorizationGrantRefresher) Refresh(grant *AuthorizationGrant) 
(*AuthorizationGrant, error) {
+       if grant.Type != GrantTypeDeviceCode {
+               return nil, errors.New("unsupported grant type")
+       }
+       if grant.Token == nil || grant.Token.RefreshToken == "" {
+               return nil, fmt.Errorf("the authorization grant has expired (no 
refresh token); please re-login")
+       }
+
+       exchangeRequest := RefreshTokenExchangeRequest{
+               TokenEndpoint: grant.TokenEndpoint,
+               ClientID:      grant.ClientID,
+               RefreshToken:  grant.Token.RefreshToken,
+       }
+       tr, err := g.exchanger.ExchangeRefreshToken(exchangeRequest)
+       if err != nil {
+               return nil, errors.Wrap(err, "could not exchange refresh token")
+       }
+
+       // RFC 6749 Section 1.5 - token exchange MAY issue a new refresh token 
(otherwise the result is blank).
+       // also see: 
https://tools.ietf.org/html/draft-ietf-oauth-security-topics-13#section-4.12
+       if tr.RefreshToken == "" {
+               tr.RefreshToken = grant.Token.RefreshToken
+       }
+
+       token := convertToOAuth2Token(tr, g.clock)
+       grant = &AuthorizationGrant{
+               Type:          GrantTypeDeviceCode,
+               Audience:      grant.Audience,
+               ClientID:      grant.ClientID,
+               Token:         &token,
+               TokenEndpoint: grant.TokenEndpoint,
+       }
+       return grant, nil
+}
diff --git a/oauth2/device_code_flow_test.go b/oauth2/device_code_flow_test.go
new file mode 100644
index 0000000..a238a48
--- /dev/null
+++ b/oauth2/device_code_flow_test.go
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+       "errors"
+       "time"
+
+       "github.com/apache/pulsar-client-go/oauth2/clock"
+       "github.com/apache/pulsar-client-go/oauth2/clock/testing"
+       "golang.org/x/oauth2"
+
+       . "github.com/onsi/ginkgo"
+       . "github.com/onsi/gomega"
+)
+
+type MockDeviceCodeProvider struct {
+       Called                     bool
+       CalledWithAudience         string
+       CalledWithAdditionalScopes []string
+       DeviceCodeResult           *DeviceCodeResult
+       ReturnsError               error
+}
+
+func (cp *MockDeviceCodeProvider) GetCode(audience string, additionalScopes 
...string) (*DeviceCodeResult, error) {
+       cp.Called = true
+       cp.CalledWithAudience = audience
+       cp.CalledWithAdditionalScopes = additionalScopes
+       return cp.DeviceCodeResult, cp.ReturnsError
+}
+
+type MockDeviceCodeCallback struct {
+       Called           bool
+       DeviceCodeResult *DeviceCodeResult
+       ReturnsError     error
+}
+
+func (c *MockDeviceCodeCallback) Callback(code *DeviceCodeResult) error {
+       c.Called = true
+       c.DeviceCodeResult = code
+       if c.ReturnsError != nil {
+               return c.ReturnsError
+       }
+       return nil
+}
+
+var _ = Describe("DeviceCodeFlow", func() {
+
+       Describe("Authorize", func() {
+               const audience = "test_clientID"
+
+               var mockClock clock.Clock
+               var mockCodeProvider *MockDeviceCodeProvider
+               var mockTokenExchanger *MockTokenExchanger
+               var mockCallback *MockDeviceCodeCallback
+               var flow *DeviceCodeFlow
+
+               BeforeEach(func() {
+                       mockClock = testing.NewFakeClock(time.Unix(0, 0))
+
+                       mockCodeProvider = &MockDeviceCodeProvider{
+                               DeviceCodeResult: &DeviceCodeResult{
+                                       DeviceCode:              
"test_deviceCode",
+                                       UserCode:                
"test_userCode",
+                                       VerificationURI:         
"http://verification_uri";,
+                                       VerificationURIComplete: 
"http://verification_uri_complete";,
+                                       ExpiresIn:               10,
+                                       Interval:                5,
+                               },
+                       }
+
+                       expectedTokens := TokenResult{AccessToken: 
"accessToken", RefreshToken: "refreshToken", ExpiresIn: 1234}
+                       mockTokenExchanger = &MockTokenExchanger{
+                               ReturnsTokens: &expectedTokens,
+                       }
+
+                       mockCallback = &MockDeviceCodeCallback{}
+
+                       opts := DeviceCodeFlowOptions{
+                               IssuerEndpoint:   "http://issuer";,
+                               ClientID:         "test_clientID",
+                               AdditionalScopes: nil,
+                               AllowRefresh:     true,
+                       }
+                       flow = newDeviceCodeFlow(
+                               opts,
+                               oidcEndpoints,
+                               mockCodeProvider,
+                               mockTokenExchanger,
+                               mockCallback.Callback,
+                               mockClock,
+                       )
+               })
+
+               It("invokes DeviceCodeProvider", func() {
+                       _, _ = flow.Authorize(audience)
+                       Expect(mockCodeProvider.Called).To(BeTrue())
+                       
Expect(mockCodeProvider.CalledWithAdditionalScopes).To(ContainElement("offline_access"))
+               })
+
+               It("invokes callback with returned code", func() {
+                       _, _ = flow.Authorize(audience)
+                       Expect(mockCallback.Called).To(BeTrue())
+                       
Expect(mockCallback.DeviceCodeResult).To(Equal(mockCodeProvider.DeviceCodeResult))
+               })
+
+               It("invokes TokenExchanger with returned code", func() {
+                       _, _ = flow.Authorize(audience)
+                       
Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&DeviceCodeExchangeRequest{
+                               TokenEndpoint: oidcEndpoints.TokenEndpoint,
+                               ClientID:      "test_clientID",
+                               PollInterval:  time.Duration(5) * time.Second,
+                               DeviceCode:    "test_deviceCode",
+                       }))
+               })
+
+               It("returns an authorization grant", func() {
+                       grant, _ := flow.Authorize(audience)
+                       Expect(grant).ToNot(BeNil())
+                       Expect(grant.Audience).To(Equal(audience))
+                       Expect(grant.ClientID).To(Equal("test_clientID"))
+                       Expect(grant.ClientCredentials).To(BeNil())
+                       
Expect(grant.TokenEndpoint).To(Equal(oidcEndpoints.TokenEndpoint))
+                       expected := 
convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
+                       Expect(*grant.Token).To(Equal(expected))
+               })
+       })
+})
+
+var _ = Describe("DeviceAuthorizationGrantRefresher", func() {
+
+       Describe("Refresh", func() {
+               var mockClock clock.Clock
+               var mockTokenExchanger *MockTokenExchanger
+               var refresher *DeviceAuthorizationGrantRefresher
+               var grant *AuthorizationGrant
+
+               BeforeEach(func() {
+                       mockClock = testing.NewFakeClock(time.Unix(0, 0))
+
+                       mockTokenExchanger = &MockTokenExchanger{}
+
+                       refresher = &DeviceAuthorizationGrantRefresher{
+                               exchanger: mockTokenExchanger,
+                               clock:     mockClock,
+                       }
+
+                       token := oauth2.Token{AccessToken: "gat", RefreshToken: 
"grt", Expiry: time.Unix(1, 0)}
+                       grant = &AuthorizationGrant{
+                               Type:          GrantTypeDeviceCode,
+                               ClientID:      "test_clientID",
+                               TokenEndpoint: oidcEndpoints.TokenEndpoint,
+                               Token:         &token,
+                       }
+               })
+
+               It("invokes the token exchanger", func() {
+                       mockTokenExchanger.ReturnsTokens = &TokenResult{
+                               AccessToken: "new token",
+                       }
+
+                       _, _ = refresher.Refresh(grant)
+                       
Expect(*mockTokenExchanger.RefreshCalledWithRequest).To(Equal(RefreshTokenExchangeRequest{
+                               TokenEndpoint: oidcEndpoints.TokenEndpoint,
+                               ClientID:      grant.ClientID,
+                               RefreshToken:  "grt",
+                       }))
+               })
+
+               It("returns the refreshed access token from the 
TokenExchanger", func() {
+                       mockTokenExchanger.ReturnsTokens = &TokenResult{
+                               AccessToken: "new token",
+                       }
+
+                       grant, _ = refresher.Refresh(grant)
+                       
Expect(grant.Token.AccessToken).To(Equal(mockTokenExchanger.ReturnsTokens.AccessToken))
+               })
+
+               It("preserves the existing refresh token from the 
TokenExchanger", func() {
+                       mockTokenExchanger.ReturnsTokens = &TokenResult{
+                               AccessToken: "new token",
+                       }
+
+                       grant, _ = refresher.Refresh(grant)
+                       Expect(grant.Token.RefreshToken).To(Equal("grt"))
+               })
+
+               It("returns the refreshed refresh token from the 
TokenExchanger", func() {
+                       mockTokenExchanger.ReturnsTokens = &TokenResult{
+                               AccessToken:  "new token",
+                               RefreshToken: "new token",
+                       }
+
+                       grant, _ = refresher.Refresh(grant)
+                       Expect(grant.Token.RefreshToken).To(Equal("new token"))
+               })
+
+               It("returns a meaningful expiration time", func() {
+                       mockTokenExchanger.ReturnsTokens = &TokenResult{
+                               AccessToken: "new token",
+                               ExpiresIn:   60,
+                       }
+
+                       grant, _ = refresher.Refresh(grant)
+                       
Expect(grant.Token.Expiry).To(Equal(mockClock.Now().Add(time.Duration(60) * 
time.Second)))
+               })
+
+               It("returns an error when TokenExchanger does", func() {
+                       mockTokenExchanger.ReturnsError = 
errors.New("someerror")
+
+                       _, err := refresher.Refresh(grant)
+                       Expect(err.Error()).To(Equal("could not exchange 
refresh token: someerror"))
+               })
+       })
+})
diff --git a/oauth2/device_code_provider.go b/oauth2/device_code_provider.go
new file mode 100644
index 0000000..23b226b
--- /dev/null
+++ b/oauth2/device_code_provider.go
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+       "encoding/json"
+       "fmt"
+       "net/http"
+       "net/url"
+       "strconv"
+       "strings"
+)
+
+// DeviceCodeProvider holds the information needed to easily get a
+// device code locally.
+type LocalDeviceCodeProvider struct {
+       options                LocalDeviceCodeProviderOptions
+       oidcWellKnownEndpoints OIDCWellKnownEndpoints
+       transport              HTTPAuthTransport
+}
+
+type DeviceCodeRequest struct {
+       ClientID string
+       Scopes   []string
+       Audience string
+}
+
+// DeviceCodeResult holds the device code gotten from the device code URL.
+type DeviceCodeResult struct {
+       DeviceCode              string `json:"device_code"`
+       UserCode                string `json:"user_code"`
+       VerificationURI         string `json:"verification_uri"`
+       VerificationURIComplete string `json:"verification_uri_complete"`
+       ExpiresIn               int    `json:"expires_in"`
+       Interval                int    `json:"interval"`
+}
+
+type LocalDeviceCodeProviderOptions struct {
+       ClientID string
+}
+
+// NewLocalDeviceCodeProvider allows for the easy setup of 
LocalDeviceCodeProvider
+func NewLocalDeviceCodeProvider(
+       options LocalDeviceCodeProviderOptions,
+       oidcWellKnownEndpoints OIDCWellKnownEndpoints,
+       authTransport HTTPAuthTransport) *LocalDeviceCodeProvider {
+       return &LocalDeviceCodeProvider{
+               options,
+               oidcWellKnownEndpoints,
+               authTransport,
+       }
+}
+
+// GetCode obtains a new device code. Additional scopes
+// beyond openid and email can be sent by passing in arguments for
+// <additionalScopes>.
+func (cp *LocalDeviceCodeProvider) GetCode(audience string, additionalScopes 
...string) (*DeviceCodeResult, error) {
+       request, err := cp.newDeviceCodeRequest(&DeviceCodeRequest{
+               ClientID: cp.options.ClientID,
+               Scopes:   append([]string{"openid", "email"}, 
additionalScopes...),
+               Audience: audience,
+       })
+       if err != nil {
+               return nil, err
+       }
+
+       response, err := cp.transport.Do(request)
+       if err != nil {
+               return nil, err
+       }
+
+       dcr, err := cp.handleDeviceCodeResponse(response)
+       if err != nil {
+               return nil, err
+       }
+
+       return dcr, nil
+}
+
+// newDeviceCodeRequest builds a new DeviceCodeRequest wrapped in an
+// http.Request
+func (cp *LocalDeviceCodeProvider) newDeviceCodeRequest(
+       req *DeviceCodeRequest) (*http.Request, error) {
+       uv := url.Values{}
+       uv.Set("client_id", req.ClientID)
+       uv.Set("scope", strings.Join(req.Scopes, " "))
+       uv.Set("audience", req.Audience)
+       euv := uv.Encode()
+
+       request, err := http.NewRequest("POST",
+               cp.oidcWellKnownEndpoints.DeviceAuthorizationEndpoint,
+               strings.NewReader(euv),
+       )
+       if err != nil {
+               return nil, err
+       }
+
+       request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+       request.Header.Add("Content-Length", strconv.Itoa(len(euv)))
+
+       return request, nil
+}
+
+func (cp *LocalDeviceCodeProvider) handleDeviceCodeResponse(resp 
*http.Response) (*DeviceCodeResult, error) {
+       if resp.StatusCode < 200 || resp.StatusCode >= 300 {
+               return nil, fmt.Errorf("a non-success status code was received: 
%d", resp.StatusCode)
+       }
+
+       defer resp.Body.Close()
+
+       dcr := DeviceCodeResult{}
+       err := json.NewDecoder(resp.Body).Decode(&dcr)
+       if err != nil {
+               return nil, err
+       }
+
+       return &dcr, nil
+}
diff --git a/oauth2/go.mod b/oauth2/go.mod
new file mode 100644
index 0000000..d185ef0
--- /dev/null
+++ b/oauth2/go.mod
@@ -0,0 +1,12 @@
+module github.com/apache/pulsar-client-go/oauth2
+
+go 1.13
+
+require (
+       github.com/99designs/keyring v1.1.5
+       github.com/dgrijalva/jwt-go v3.2.0+incompatible
+       github.com/onsi/ginkgo v1.14.0
+       github.com/onsi/gomega v1.10.1
+       github.com/pkg/errors v0.9.1
+       golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
+)
diff --git a/oauth2/go.sum b/oauth2/go.sum
new file mode 100644
index 0000000..5063d2f
--- /dev/null
+++ b/oauth2/go.sum
@@ -0,0 +1,113 @@
+cloud.google.com/go v0.34.0/go.mod 
h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/99designs/keyring v1.1.5 
h1:wLv7QyzYpFIyMSwOADq1CLTF9KbjbBfcnfmOGJ64aO4=
+github.com/99designs/keyring v1.1.5/go.mod 
h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpHqTozIPqf0=
+github.com/danieljoos/wincred v1.0.2 
h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU=
+github.com/danieljoos/wincred v1.0.2/go.mod 
h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U=
+github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible 
h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod 
h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
+github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a 
h1:mq+R6XEM6lJX5VlLyZIrUSP8tSuJp82xTK89hvBwJbU=
+github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a/go.mod 
h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
+github.com/fsnotify/fsnotify v1.4.7/go.mod 
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.9 
h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
+github.com/fsnotify/fsnotify v1.4.9/go.mod 
h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
+github.com/go-logr/logr v0.1.0/go.mod 
h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
+github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 
h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
+github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod 
h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
+github.com/golang/protobuf v1.2.0/go.mod 
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod 
h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod 
h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod 
h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod 
h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod 
h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.2 
h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
+github.com/golang/protobuf v1.4.2/go.mod 
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/google/go-cmp v0.3.0/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
+github.com/google/go-cmp v0.4.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c 
h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
+github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod 
h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
+github.com/hpcloud/tail v1.0.0/go.mod 
h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
+github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d 
h1:Z+RDyXzjKE0i2sTjZ/b1uxiGtPhFy34Ou/Tk0qwN0kM=
+github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod 
h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod 
h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod 
h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/mitchellh/go-homedir v1.1.0 
h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
+github.com/mitchellh/go-homedir v1.1.0/go.mod 
h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
+github.com/mtibben/percent v0.2.1 
h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs=
+github.com/mtibben/percent v0.2.1/go.mod 
h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
+github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
+github.com/nxadm/tail v1.4.4/go.mod 
h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/onsi/ginkgo v1.6.0/go.mod 
h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod 
h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA=
+github.com/onsi/ginkgo v1.14.0/go.mod 
h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
+github.com/onsi/gomega v1.7.1/go.mod 
h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
+github.com/onsi/gomega v1.10.1/go.mod 
h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/spf13/afero v1.2.2/go.mod 
h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
+github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
+github.com/stretchr/objx v0.2.0/go.mod 
h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
+github.com/stretchr/testify v1.3.0 
h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod 
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 
h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
+golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod 
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod 
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 
h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod 
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d 
h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 
h1:DYfZAGf2WMFjMxbgTjaC+2HC7NkNAQs+6Q8b9WEB/F4=
+golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 
h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/appengine v1.4.0 
h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
+google.golang.org/appengine v1.4.0/go.mod 
h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod 
h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod 
h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod 
h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod 
h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
+google.golang.org/protobuf v1.21.0/go.mod 
h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.23.0 
h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
+google.golang.org/protobuf v1.23.0/go.mod 
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 
h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod 
h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 
h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod 
h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
+gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
+gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
+k8s.io/utils v0.0.0-20200619165400-6e3d28b6ed19 
h1:7Nu2dTj82c6IaWvL7hImJzcXoTPz1MsSCH7r+0m6rfo=
+k8s.io/utils v0.0.0-20200619165400-6e3d28b6ed19/go.mod 
h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
diff --git a/oauth2/oidc_endpoint_provider.go b/oauth2/oidc_endpoint_provider.go
new file mode 100644
index 0000000..32986b7
--- /dev/null
+++ b/oauth2/oidc_endpoint_provider.go
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+       "encoding/json"
+       "net/http"
+       "net/url"
+       "path"
+
+       "github.com/pkg/errors"
+)
+
+// OIDCWellKnownEndpoints holds the well known OIDC endpoints
+type OIDCWellKnownEndpoints struct {
+       AuthorizationEndpoint       string `json:"authorization_endpoint"`
+       TokenEndpoint               string `json:"token_endpoint"`
+       DeviceAuthorizationEndpoint string 
`json:"device_authorization_endpoint"`
+}
+
+// GetOIDCWellKnownEndpointsFromIssuerURL gets the well known endpoints for the
+// passed in issuer url
+func GetOIDCWellKnownEndpointsFromIssuerURL(issuerURL string) 
(*OIDCWellKnownEndpoints, error) {
+       u, err := url.Parse(issuerURL)
+       if err != nil {
+               return nil, errors.Wrap(err, "could not parse issuer url to 
build well known endpoints")
+       }
+       u.Path = path.Join(u.Path, ".well-known/openid-configuration")
+
+       r, err := http.Get(u.String())
+       if err != nil {
+               return nil, errors.Wrapf(err, "could not get well known 
endpoints from url %s", u.String())
+       }
+       defer r.Body.Close()
+
+       var wkEndpoints OIDCWellKnownEndpoints
+       err = json.NewDecoder(r.Body).Decode(&wkEndpoints)
+       if err != nil {
+               return nil, errors.Wrap(err, "could not decode json body when 
getting well known endpoints")
+       }
+
+       return &wkEndpoints, nil
+}
diff --git a/oauth2/oidc_endpoint_provider_test.go 
b/oauth2/oidc_endpoint_provider_test.go
new file mode 100644
index 0000000..4ebce3b
--- /dev/null
+++ b/oauth2/oidc_endpoint_provider_test.go
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+       "encoding/json"
+       "net/http"
+       "net/http/httptest"
+
+       . "github.com/onsi/ginkgo"
+       . "github.com/onsi/gomega"
+)
+
+var _ = Describe("GetOIDCWellKnownEndpointsFromIssuerURL", func() {
+       It("calls and gets the well known data from the correct endpoint for 
the issuer", func() {
+               var req *http.Request
+               wkEndpointsResp := OIDCWellKnownEndpoints{
+                       AuthorizationEndpoint: "the-auth-endpoint", 
TokenEndpoint: "the-token-endpoint"}
+               responseBytes, err := json.Marshal(wkEndpointsResp)
+               Expect(err).ToNot(HaveOccurred())
+
+               ts := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+                       req = r
+
+                       w.WriteHeader(http.StatusOK)
+                       w.Write(responseBytes)
+
+               }))
+               defer ts.Close()
+
+               endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(ts.URL)
+
+               Expect(err).ToNot(HaveOccurred())
+               Expect(*endpoints).To(Equal(wkEndpointsResp))
+               
Expect(req.URL.Path).To(Equal("/.well-known/openid-configuration"))
+       })
+
+       It("errors when url.Parse errors", func() {
+               endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL("://")
+
+               Expect(err).To(HaveOccurred())
+               Expect(err.Error()).To(Equal(
+                       "could not parse issuer url to build well known 
endpoints: parse ://: missing protocol scheme"))
+               Expect(endpoints).To(BeNil())
+       })
+
+       It("errors when the get errors", func() {
+               endpoints, err := 
GetOIDCWellKnownEndpointsFromIssuerURL("https://";)
+
+               Expect(err).To(HaveOccurred())
+               Expect(err.Error()).To(Equal(
+                       "could not get well known endpoints from url 
https://.well-known/openid-configuration: " +
+                               "Get https://.well-known/openid-configuration: 
dial tcp: lookup .well-known: no such host"))
+               Expect(endpoints).To(BeNil())
+       })
+
+       It("errors when the json decoder errors", func() {
+               var req *http.Request
+
+               ts := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
+                       req = r
+
+                       w.WriteHeader(http.StatusOK)
+                       w.Write([]byte("<"))
+
+               }))
+               defer ts.Close()
+
+               endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(ts.URL)
+
+               Expect(err).To(HaveOccurred())
+               Expect(err.Error()).To(Equal("could not decode json body when 
getting well" +
+                       " known endpoints: invalid character '<' looking for 
beginning of value"))
+               Expect(endpoints).To(BeNil())
+               
Expect(req.URL.Path).To(Equal("/.well-known/openid-configuration"))
+       })
+})
diff --git a/oauth2/store/keyring.go b/oauth2/store/keyring.go
new file mode 100644
index 0000000..fd49baf
--- /dev/null
+++ b/oauth2/store/keyring.go
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+       "crypto/sha1"
+       "encoding/json"
+       "fmt"
+       "sync"
+
+       "github.com/99designs/keyring"
+       "github.com/apache/pulsar-client-go/oauth2"
+       "github.com/apache/pulsar-client-go/oauth2/clock"
+)
+
+type KeyringStore struct {
+       kr    keyring.Keyring
+       clock clock.Clock
+       lock  sync.Mutex
+}
+
+// storedItem represents an item stored in the keyring
+type storedItem struct {
+       Audience string
+       UserName string
+       Grant    oauth2.AuthorizationGrant
+}
+
+// NewKeyringStore creates a store based on a keyring.
+func NewKeyringStore(kr keyring.Keyring) (*KeyringStore, error) {
+       return &KeyringStore{
+               kr:    kr,
+               clock: clock.RealClock{},
+       }, nil
+}
+
+var _ Store = &KeyringStore{}
+
+func (f *KeyringStore) SaveGrant(audience string, grant 
oauth2.AuthorizationGrant) error {
+       f.lock.Lock()
+       defer f.lock.Unlock()
+
+       var err error
+       var userName string
+       switch grant.Type {
+       case oauth2.GrantTypeClientCredentials:
+               if grant.ClientCredentials == nil {
+                       return ErrUnsupportedAuthData
+               }
+               userName = grant.ClientCredentials.ClientEmail
+       case oauth2.GrantTypeDeviceCode:
+               if grant.Token == nil {
+                       return ErrUnsupportedAuthData
+               }
+               userName, err = oauth2.ExtractUserName(*grant.Token)
+               if err != nil {
+                       return err
+               }
+       default:
+               return ErrUnsupportedAuthData
+       }
+       item := storedItem{
+               Audience: audience,
+               UserName: userName,
+               Grant:    grant,
+       }
+       err = f.setItem(item)
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func (f *KeyringStore) LoadGrant(audience string) (*oauth2.AuthorizationGrant, 
error) {
+       f.lock.Lock()
+       defer f.lock.Unlock()
+
+       item, err := f.getItem(audience)
+       if err != nil {
+               if err == keyring.ErrKeyNotFound {
+                       return nil, ErrNoAuthenticationData
+               }
+               return nil, err
+       }
+       switch item.Grant.Type {
+       case oauth2.GrantTypeClientCredentials:
+               if item.Grant.ClientCredentials == nil {
+                       return nil, ErrUnsupportedAuthData
+               }
+       case oauth2.GrantTypeDeviceCode:
+               if item.Grant.Token == nil {
+                       return nil, ErrUnsupportedAuthData
+               }
+       default:
+               return nil, ErrUnsupportedAuthData
+       }
+       return &item.Grant, nil
+}
+
+func (f *KeyringStore) WhoAmI(audience string) (string, error) {
+       f.lock.Lock()
+       defer f.lock.Unlock()
+
+       key := hashKeyringKey(audience)
+       authItem, err := f.kr.GetMetadata(key)
+       if err != nil {
+               if err == keyring.ErrKeyNotFound {
+                       return "", ErrNoAuthenticationData
+               }
+               return "", fmt.Errorf("unable to get information from the 
keyring: %v", err)
+       }
+       return authItem.Label, nil
+}
+
+func (f *KeyringStore) Logout() error {
+       f.lock.Lock()
+       defer f.lock.Unlock()
+
+       var err error
+       keys, err := f.kr.Keys()
+       if err != nil {
+               return fmt.Errorf("unable to get information from the keyring: 
%v", err)
+       }
+       for _, key := range keys {
+               err = f.kr.Remove(key)
+       }
+       if err != nil {
+               return fmt.Errorf("unable to update the keyring: %v", err)
+       }
+       return nil
+}
+
+func (f *KeyringStore) getItem(audience string) (storedItem, error) {
+       key := hashKeyringKey(audience)
+       i, err := f.kr.Get(key)
+       if err != nil {
+               return storedItem{}, err
+       }
+       var grant oauth2.AuthorizationGrant
+       err = json.Unmarshal(i.Data, &grant)
+       if err != nil {
+               // the grant appears to be invalid
+               return storedItem{}, ErrUnsupportedAuthData
+       }
+       return storedItem{
+               Audience: audience,
+               UserName: i.Label,
+               Grant:    grant,
+       }, nil
+}
+
+func (f *KeyringStore) setItem(item storedItem) error {
+       key := hashKeyringKey(item.Audience)
+       data, err := json.Marshal(item.Grant)
+       if err != nil {
+               return err
+       }
+       i := keyring.Item{
+               Key:                         key,
+               Data:                        data,
+               Label:                       item.UserName,
+               Description:                 "authorization grant",
+               KeychainNotTrustApplication: false,
+               KeychainNotSynchronizable:   false,
+       }
+       err = f.kr.Set(i)
+       if err != nil {
+               return fmt.Errorf("unable to update the keyring: %v", err)
+       }
+       return nil
+}
+
+// hashKeyringKey creates a safe key based on the given string
+func hashKeyringKey(s string) string {
+       h := sha1.New()
+       h.Write([]byte(s))
+       bs := h.Sum(nil)
+       return fmt.Sprintf("%x", bs)
+}
diff --git a/oauth2/store/memory.go b/oauth2/store/memory.go
new file mode 100644
index 0000000..07c7594
--- /dev/null
+++ b/oauth2/store/memory.go
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+       "sync"
+
+       "github.com/apache/pulsar-client-go/oauth2"
+       "github.com/apache/pulsar-client-go/oauth2/clock"
+)
+
+type MemoryStore struct {
+       clock  clock.Clock
+       lock   sync.Mutex
+       grants map[string]*oauth2.AuthorizationGrant
+}
+
+func NewMemoryStore() Store {
+       return &MemoryStore{
+               clock:  clock.RealClock{},
+               grants: make(map[string]*oauth2.AuthorizationGrant),
+       }
+}
+
+var _ Store = &MemoryStore{}
+
+func (f *MemoryStore) SaveGrant(audience string, grant 
oauth2.AuthorizationGrant) error {
+       f.lock.Lock()
+       defer f.lock.Unlock()
+       f.grants[audience] = &grant
+       return nil
+}
+
+func (f *MemoryStore) LoadGrant(audience string) (*oauth2.AuthorizationGrant, 
error) {
+       f.lock.Lock()
+       defer f.lock.Unlock()
+       grant, ok := f.grants[audience]
+       if !ok {
+               return nil, ErrNoAuthenticationData
+       }
+       return grant, nil
+}
+
+func (f *MemoryStore) WhoAmI(audience string) (string, error) {
+       f.lock.Lock()
+       defer f.lock.Unlock()
+       grant, ok := f.grants[audience]
+       if !ok {
+               return "", ErrNoAuthenticationData
+       }
+       switch grant.Type {
+       case oauth2.GrantTypeClientCredentials:
+               if grant.ClientCredentials == nil {
+                       return "", ErrUnsupportedAuthData
+               }
+               return grant.ClientCredentials.ClientEmail, nil
+       case oauth2.GrantTypeDeviceCode:
+               if grant.Token == nil {
+                       return "", ErrUnsupportedAuthData
+               }
+               return oauth2.ExtractUserName(*grant.Token)
+       default:
+               return "", ErrUnsupportedAuthData
+       }
+}
+
+func (f *MemoryStore) Logout() error {
+       f.lock.Lock()
+       defer f.lock.Unlock()
+       f.grants = map[string]*oauth2.AuthorizationGrant{}
+       return nil
+}
diff --git a/oauth2/store/store.go b/oauth2/store/store.go
new file mode 100644
index 0000000..55d4c9e
--- /dev/null
+++ b/oauth2/store/store.go
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+       "errors"
+
+       "github.com/apache/pulsar-client-go/oauth2"
+)
+
+// ErrNoAuthenticationData indicates that stored authentication data is not 
available
+var ErrNoAuthenticationData = errors.New("authentication data is not 
available")
+
+// ErrUnsupportedAuthData ndicates that stored authentication data is unusable
+var ErrUnsupportedAuthData = errors.New("authentication data is not usable")
+
+// Store is responsible for persisting authorization grants
+type Store interface {
+       // SaveGrant stores an authorization grant for a given audience
+       SaveGrant(audience string, grant oauth2.AuthorizationGrant) error
+
+       // LoadGrant loads an authorization grant for a given audience
+       LoadGrant(audience string) (*oauth2.AuthorizationGrant, error)
+
+       // WhoAmI returns the current user name (or an error if nobody is 
logged in)
+       WhoAmI(audience string) (string, error)
+
+       // Logout deletes all stored credentials
+       Logout() error
+}
diff --git a/pulsar/client.go b/pulsar/client.go
index d4af906..460b275 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -62,6 +62,11 @@ func NewAuthenticationAthenz(authParams map[string]string) 
Authentication {
        return athenz
 }
 
+func NewAuthenticationOAuth2(authParams map[string]string) Authentication {
+       oauth, _ := auth.NewAuthenticationOAuth2WithParams(authParams)
+       return oauth
+}
+
 // Builder interface that is used to construct a Pulsar Client instance.
 type ClientOptions struct {
        // Configure the service URL for the Pulsar service.
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 308c8e0..276e4fb 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -20,9 +20,13 @@ package pulsar
 import (
        "fmt"
        "io/ioutil"
+       "net/http"
+       "net/http/httptest"
+       "os"
        "testing"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/internal/auth"
        "github.com/stretchr/testify/assert"
 )
 
@@ -219,6 +223,96 @@ func TestTokenAuthFromFile(t *testing.T) {
        client.Close()
 }
 
+// mockOAuthServer will mock a oauth service for the tests
+func mockOAuthServer() *httptest.Server {
+       // prepare a port for the mocked server
+       server := httptest.NewUnstartedServer(http.DefaultServeMux)
+
+       // mock the used REST path for the tests
+       mockedHandler := http.NewServeMux()
+       mockedHandler.HandleFunc("/.well-known/openid-configuration", 
func(writer http.ResponseWriter, request *http.Request) {
+               s := fmt.Sprintf(`{
+    "issuer":"%s",
+    "authorization_endpoint":"%s/authorize",
+    "token_endpoint":"%s/oauth/token",
+    "device_authorization_endpoint":"%s/oauth/device/code"
+}`, server.URL, server.URL, server.URL, server.URL)
+               fmt.Fprintln(writer, s)
+       })
+       mockedHandler.HandleFunc("/oauth/token", func(writer 
http.ResponseWriter, request *http.Request) {
+               fmt.Fprintln(writer, "{\n"+
+                       "  \"access_token\": 
\"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0b2tlbi1wcmluY2lwYWwifQ."+
+                       "tSfgR8l7dKC6LoWCxQgNkuSB8our7xV_nAM7wpgCbG4\",\n"+
+                       "  \"token_type\": \"Bearer\"\n}")
+       })
+       mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, 
request *http.Request) {
+               fmt.Fprintln(writer, "true")
+       })
+
+       server.Config.Handler = mockedHandler
+       server.Start()
+
+       return server
+}
+
+// mockKeyFile will mock a temp key file for testing.
+func mockKeyFile(server string) (string, error) {
+       pwd, err := os.Getwd()
+       if err != nil {
+               return "", err
+       }
+       kf, err := ioutil.TempFile(pwd, "test_oauth2")
+       if err != nil {
+               return "", err
+       }
+       _, err = kf.WriteString(fmt.Sprintf(`{
+  "type":"sn_service_account",
+  "client_id":"client-id",
+  "client_secret":"client-secret",
+  "client_email":"[email protected]",
+  "issuer_url":"%s"
+}`, server))
+       if err != nil {
+               return "", err
+       }
+
+       return kf.Name(), nil
+}
+
+func TestOAuth2Auth(t *testing.T) {
+       server := mockOAuthServer()
+       defer server.Close()
+       kf, err := mockKeyFile(server.URL)
+       defer os.Remove(kf)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       params := map[string]string{
+               auth.ConfigParamType:      
auth.ConfigParamTypeClientCredentials,
+               auth.ConfigParamIssuerURL: server.URL,
+               auth.ConfigParamClientID:  "client-id",
+               auth.ConfigParamAudience:  "audience",
+               auth.ConfigParamKeyFile:   kf,
+       }
+
+       oauth := NewAuthenticationOAuth2(params)
+       client, err := NewClient(ClientOptions{
+               URL:            serviceURL,
+               Authentication: oauth,
+       })
+       assert.NoError(t, err)
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: newAuthTopicName(),
+       })
+
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+
+       client.Close()
+}
+
 func TestTopicPartitions(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: "pulsar://localhost:6650",
diff --git a/pulsar/internal/auth/oauth2.go b/pulsar/internal/auth/oauth2.go
new file mode 100644
index 0000000..9ee63ab
--- /dev/null
+++ b/pulsar/internal/auth/oauth2.go
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package auth
+
+import (
+       "crypto/tls"
+       "fmt"
+
+       "github.com/apache/pulsar-client-go/oauth2"
+       "github.com/apache/pulsar-client-go/oauth2/cache"
+       "github.com/apache/pulsar-client-go/oauth2/clock"
+       "github.com/apache/pulsar-client-go/oauth2/store"
+)
+
+const (
+       ConfigParamType                  = "type"
+       ConfigParamTypeClientCredentials = "client_credentials"
+       ConfigParamIssuerURL             = "issuerUrl"
+       ConfigParamAudience              = "audience"
+       ConfigParamKeyFile               = "privateKey"
+       ConfigParamClientID              = "clientId"
+)
+
+type oauth2AuthProvider struct {
+       clock  clock.Clock
+       issuer oauth2.Issuer
+       store  store.Store
+       source cache.CachingTokenSource
+}
+
+// NewAuthenticationOAuth2WithParams return a interface of Provider with 
string map.
+func NewAuthenticationOAuth2WithParams(params map[string]string) (Provider, 
error) {
+       issuer := oauth2.Issuer{
+               IssuerEndpoint: params[ConfigParamIssuerURL],
+               ClientID:       params[ConfigParamClientID],
+               Audience:       params[ConfigParamAudience],
+       }
+
+       // initialize a store of authorization grants
+       st := store.NewMemoryStore()
+       switch params[ConfigParamType] {
+       case ConfigParamTypeClientCredentials:
+               flow, err := 
oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{
+                       KeyFile:          params[ConfigParamKeyFile],
+                       AdditionalScopes: nil,
+               })
+               if err != nil {
+                       return nil, err
+               }
+               grant, err := flow.Authorize(issuer.Audience)
+               if err != nil {
+                       return nil, err
+               }
+               err = st.SaveGrant(issuer.Audience, *grant)
+               if err != nil {
+                       return nil, err
+               }
+       default:
+               return nil, fmt.Errorf("unsupported authentication type: %s", 
params[ConfigParamType])
+       }
+
+       return NewAuthenticationOAuth2(issuer, st), nil
+}
+
+func NewAuthenticationOAuth2(
+       issuer oauth2.Issuer,
+       store store.Store) Provider {
+
+       return &oauth2AuthProvider{
+               clock:  clock.RealClock{},
+               issuer: issuer,
+               store:  store,
+       }
+}
+
+func (p *oauth2AuthProvider) Init() error {
+       grant, err := p.store.LoadGrant(p.issuer.Audience)
+       if err != nil {
+               if err == store.ErrNoAuthenticationData {
+                       return nil
+               }
+               return err
+       }
+       refresher, err := p.getRefresher(grant.Type)
+       if err != nil {
+               return err
+       }
+
+       source, err := cache.NewDefaultTokenCache(p.store, p.issuer.Audience, 
refresher)
+       if err != nil {
+               return err
+       }
+       p.source = source
+       return nil
+}
+
+func (p *oauth2AuthProvider) Name() string {
+       return "token"
+}
+
+func (p *oauth2AuthProvider) GetTLSCertificate() (*tls.Certificate, error) {
+       return nil, nil
+}
+
+func (p *oauth2AuthProvider) GetData() ([]byte, error) {
+       if p.source == nil {
+               // anonymous access
+               return nil, nil
+       }
+       token, err := p.source.Token()
+       if err != nil {
+               return nil, err
+       }
+       return []byte(token.AccessToken), nil
+}
+
+func (p *oauth2AuthProvider) Close() error {
+       return nil
+}
+
+func (p *oauth2AuthProvider) getRefresher(t oauth2.AuthorizationGrantType) 
(oauth2.AuthorizationGrantRefresher, error) {
+       switch t {
+       case oauth2.GrantTypeClientCredentials:
+               return oauth2.NewDefaultClientCredentialsGrantRefresher(p.clock)
+       case oauth2.GrantTypeDeviceCode:
+               return 
oauth2.NewDefaultDeviceAuthorizationGrantRefresher(p.clock)
+       default:
+               return nil, store.ErrUnsupportedAuthData
+       }
+}
diff --git a/pulsar/internal/auth/oauth2_test.go 
b/pulsar/internal/auth/oauth2_test.go
new file mode 100644
index 0000000..c8a4830
--- /dev/null
+++ b/pulsar/internal/auth/oauth2_test.go
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package auth
+
+import (
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "net/http/httptest"
+       "os"
+       "testing"
+
+       "github.com/pkg/errors"
+       "github.com/stretchr/testify/assert"
+)
+
+// mockOAuthServer will mock a oauth service for the tests
+func mockOAuthServer() *httptest.Server {
+       // prepare a port for the mocked server
+       server := httptest.NewUnstartedServer(http.DefaultServeMux)
+
+       // mock the used REST path for the tests
+       mockedHandler := http.NewServeMux()
+       mockedHandler.HandleFunc("/.well-known/openid-configuration", 
func(writer http.ResponseWriter, request *http.Request) {
+               s := fmt.Sprintf(`{
+    "issuer":"%s",
+    "authorization_endpoint":"%s/authorize",
+    "token_endpoint":"%s/oauth/token",
+    "device_authorization_endpoint":"%s/oauth/device/code"
+}`, server.URL, server.URL, server.URL, server.URL)
+               fmt.Fprintln(writer, s)
+       })
+       mockedHandler.HandleFunc("/oauth/token", func(writer 
http.ResponseWriter, request *http.Request) {
+               fmt.Fprintln(writer, "{\n  \"access_token\": 
\"token-content\",\n  \"token_type\": \"Bearer\"\n}")
+       })
+       mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, 
request *http.Request) {
+               fmt.Fprintln(writer, "true")
+       })
+
+       server.Config.Handler = mockedHandler
+       server.Start()
+
+       return server
+}
+
+// mockKeyFile will mock a temp key file for testing.
+func mockKeyFile(server string) (string, error) {
+       pwd, err := os.Getwd()
+       if err != nil {
+               return "", err
+       }
+       kf, err := ioutil.TempFile(pwd, "test_oauth2")
+       if err != nil {
+               return "", err
+       }
+       _, err = kf.WriteString(fmt.Sprintf(`{
+  "type":"sn_service_account",
+  "client_id":"client-id",
+  "client_secret":"client-secret",
+  "client_email":"[email protected]",
+  "issuer_url":"%s"
+}`, server))
+       if err != nil {
+               return "", err
+       }
+
+       return kf.Name(), nil
+}
+
+func TestNewAuthenticationOAuth2WithParams(t *testing.T) {
+       server := mockOAuthServer()
+       defer server.Close()
+       kf, err := mockKeyFile(server.URL)
+       defer os.Remove(kf)
+       if err != nil {
+               t.Fatal(errors.Wrap(err, "create mocked key file failed"))
+       }
+
+       params := map[string]string{
+               ConfigParamType:      ConfigParamTypeClientCredentials,
+               ConfigParamIssuerURL: server.URL,
+               ConfigParamClientID:  "client-id",
+               ConfigParamAudience:  "audience",
+               ConfigParamKeyFile:   kf,
+       }
+
+       auth, err := NewAuthenticationOAuth2WithParams(params)
+       if err != nil {
+               t.Fatal(err)
+       }
+       err = auth.Init()
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       token, err := auth.GetData()
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       assert.Equal(t, "token-content", string(token))
+}
diff --git a/pulsar/internal/auth/token.go b/pulsar/internal/auth/token.go
index e6a6d97..d8d0cd3 100644
--- a/pulsar/internal/auth/token.go
+++ b/pulsar/internal/auth/token.go
@@ -102,6 +102,6 @@ func (p *tokenAuthProvider) GetData() ([]byte, error) {
        return []byte(t), nil
 }
 
-func (tokenAuthProvider) Close() error {
+func (p *tokenAuthProvider) Close() error {
        return nil
 }

Reply via email to